[Python] A memo that I tried to get started with asyncio

What is asyncio?

asyncio is a library for writing ** concurrency ** code using the ** async / await ** syntax.

asyncio is used as the basis for multiple asynchronous Python frameworks such as high performance networks and web servers, database connection libraries, and distributed task queues.

asyncio --- Asynchronous I / O — Python 3.9.0 Documentation

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

What's this, isn't it like C #'s async / await ... I'm using Python, but I'm ashamed to say that I didn't know it before. Sometimes I write C # code, but every time I touch around async / await, I wanted this for Python too ... Asynchronous programming in C # | Microsoft Docs

Speaking of asynchronous processing in Python

It was an image around. Concurrency — Python 3.9.0 documentation [https://docs.python.org/ja/3/library/concurrency.html) Will asyncio be an option in the future?

Verification environment

Hello World!

The sample at the beginning is the code found in the official document above. First, let's move this. As you can see in the code comments, it works with ** Python 3.7 and above **. No special preparation such as installation of additional libraries is required.

helloworld.py


import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

Terminal


$ python3 helloworld.py
Hello ...
... World!

Make sure there is a 1 second between Hello ... and ... World !. The points are as follows.

--Write the process equivalent to main () as a function with ```async (** coroutine **) --You can do `ʻawait in a function with ʻasync`` --ʻasyncio.run (main ()) Call `` only once

But I'm not happy with this alone, right? It's not asynchronous processing.

Example of proper asynchronous processing

So what about this?

async_sleep1.py


import asyncio

async def func1():
    print('func1() started')
    await asyncio.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    await asyncio.sleep(1)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

When I run it, func1 () and func2 () start almost at the same time, and after 1 second, func1 () and func2 () end almost at the same time. You can see that there is.

func1() started
func2() started
func1() finished
func2() finished

Roughly speaking, `ʻawait`` is an image of ** "give your turn to another person and wait for yourself" **. Here, the "other person's (work)" is the Task, which can be created by any of the following methods.

--Specify a predefined process such as ʻasyncio.sleep () --Wrapping the return value (coroutine object) of a function withʻasyncwith` ʻasyncio.create_task ()

There is a method called. Here, simply writing func1 () does nothing, but by using it like `ʻasyncio.create_task (func1 ()) , func1 () `` works. You can put it on standby so that you can start it. Just saying "standby" does not mean that it will start working immediately (described later).

asyncio.sleep?

What is worrisome here is ʻawait asyncio.sleep (1) `. I understand the meaning, but when it comes to "waiting for n seconds," time.sleep () is the standard. So what if I change it here?

async_sleep2.py


import asyncio

async def func1():
    print('func1() started')
    #await asyncio.sleep(1)
    time.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    #await asyncio.sleep(1)
    time.sleep(1)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

Unfortunately, func2 () will not start until func1 () ends.

func1() started
func1() finished
func2() started
func2() finished

In order to run the two processes in parallel, it is necessary to stop the processes with ʻawait asyncio.sleep (1) `.

coroutine asyncio.sleep(delay, result=None, *, loop=None)

delay Stops for a second. If result is provided, it will be returned to the caller when the coroutine completes. sleep () always suspends the current Task and allows other Tasks to run.

The point is, "Always suspend the current Task and allow other Tasks to run." If you call ʻasyncio.sleep () `to wait, ** give up your turn if someone else's work comes in while you wait. ** In other words, ** Just letting the process take a long time does not allow other processes to come in. ** **

Another confusing example.

async_sleep3.py


import asyncio
import time

async def func1():
    print('func1() started')
    await asyncio.sleep(1)
    print('func1() finished')

async def func2():
    print('func2() started')
    time.sleep(2)
    print('func2() finished')

async def main():
    task1 = asyncio.create_task(func1())
    task2 = asyncio.create_task(func2())
    await task1
    await task2

asyncio.run(main())

func1 () uses ```asyncio.sleep () to wait, while func2 () uses time.sleep () . And func2 () `` has a longer sleep time. The result of this execution is ...

func1() started
func2() started
func2() finished
func1() finished

It will be like this. ** func1 (), which should only sleep for 1 second, somehow ends after func2 (). ** **

Now let's revisit the description in the previous document.

sleep () always suspends the current Task and allows other Tasks to run.

** You can give your turn to someone else with ʻasyncio.sleep () , but you can't interrupt voluntarily. ** If a new process comes in while sleeping with ʻasyncio.sleep () , it will switch to that, but just because the sleep time is over, it is running (ʻasyncio.sleep () It does not interrupt the process (not in `) and return control. In the previous example, after waiting for 1 second with func1 (), func2 () is still running, so we will not force it to take turns. Wait for func2 () to finish before proceeding. It's a law. [^ func2]

[^ func2]: Of course, if func2 () gives up the order with ```asyncio.sleep () , then func1 () `` can start working.

Illustrated based on the above

Is it like this if you illustrate the processing of ʻasync_sleep1.py`` that can be processed in parallel well? The colored areas represent the amount of time you actually have control. In the following, ʻasyncio.sleep () is abbreviated as` ʻaio.sleep () ``.

image.png

  1. main () puts task1 and task2 on standby with create_task (). task1 and task2 do not start immediately and wait.
  2. With ```await, main () gives up control and waits for task1to finish. task1`` receives control.
  3. task1 releases control by executing ```asyncio.sleep (1) and waits for 1 second. task2`` receives control.
  4. task2 releases control by executing ```asyncio.sleep (1) `` and waits for 1 second. There is no processing that can be executed.
  5. task1 waits for 1 second and receives control.
  6. Now that the processing of task1 is finished, main () regains control.
  7. With ```await, main () gives up control and waits for task2`` to finish. There is no processing that can be executed.
  8. task2 waits for 1 second and receives control.
  9. Now that the processing of task2 is finished, main () regains control.
  10. The processing of main () ends.

On the other hand, the process of `ʻasync_sleep3.py`` will be illustrated as follows.

image.png

If you use time.sleep (), task2 is still in control. Therefore, task1 cannot resume until task2 finishes, even though it waited for 1 second.

Well, around here we notice. ** It is not possible to use it to perform other processing in parallel while rotating the calculation on the CPU. ** ** Multiple processes are not running at the same time. ** This is actually a single thread. ** **

For example, if it is based on threading.Thread, you can do the following. [^ 1]

[^ 1]: In this code, the two processes work in parallel (apparently), but the GIL does not reduce the total processing time. If you want to speed up with multi-core, multiprocessing.Process is better. → I investigated GIL that you should know if you want to do parallel processing with Python --Qiita

example_threading.py


import threading

def func1():
    print('func1() started')
    x = 0
    for i in range(1000000):
        x += i
    print('func1() finished:', x)

def func2():
    print('func2() started')
    x = 0
    for i in range(1000000):
        x += i * 2
    print('func2() finished:', x)

def main():
    thread1 = threading.Thread(target=func1)
    thread2 = threading.Thread(target=func2)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

main()
func1() started
func2() started
func1() finished: 499999500000
func2() finished: 999999000000

If you look closely at the name asyncio

** io ** That's right. In other words, it can be said that it is a mechanism for (apparently) parallel processing, which is mainly processing waiting for I / O (input / output) from the beginning. However, in order to do other processing in the meantime, it is necessary to wait using a dedicated function, and even time.sleep () cannot be used. I think that I / O waiting that often appears is network transmission / reception, but inferring from the results so far, unlike ** threading.Thread etc., write input / output code normally However, it is unlikely that it will be parallel processing, so how should we use it ...? ** **

To parallelize input and output

What we can see so far is that in order to perform input / output asynchronously (apparently in parallel), it is necessary to ** "give the order to other processing while waiting for input / output" **. That is. Therefore, there is a function in `ʻasynciothat handles the process of giving up the order to the I / O wait time, such as sleep``.

Let's try the example of socket communication in the following document. Streams — Python 3.9.0 Documentation

stream.py


import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

If you execute it with a URL attached to the command line argument as shown below, an HTTP header will be returned.

$ python3 stream.py https://www.google.com/
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Omitted below)

However, this alone is neither asynchronous nor anything, so it is not interesting, so let's run the processing that uses the CPU behind the scenes while waiting.

stream2.py


import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

async def cpu_work():
    x = 0
    for i in range(1000000):
        x += i
        if i % 100000 == 0:
            print(x)
            await asyncio.sleep(0.01)

async def main(url):
    task1 = asyncio.create_task(print_http_headers(url))
    task2 = asyncio.create_task(cpu_work())
    await task1
    await task2

url = sys.argv[1]
asyncio.run(main(url))

As shown below, the calculation is performed behind the scenes until a response is returned from the server, when it is returned, the processing is performed, and when the processing is completed, the rest of the calculation is performed.

0
5000050000
20000100000
45000150000
80000200000
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Omitted)
125000250000
180000300000
245000350000
320000400000
405000450000

```open_connection () and readline () give up control and give up to others until a response is returned. The cpu_work () ( task2``) that was given the order uses the CPU to perform calculations, but periodically tries to give the order to another person. If no one comes, continue the calculation, if someone comes, give up the turn and wait until the other person's work is finished.

The point here is to write ```await asyncio.sleep (0.01) in cpu_work () , and if you forget this, even if a response is returned from the server during calculation, it will be processed. I can't. <s> Ancient </ s> Reminiscent of DoEvents () `` in Visual Basic 6.0. When doing heavy loop processing, if you do not call it in the loop, the window will freeze without response. If you don't come with a pin, ask the person at home. </ s>

I really want to process in parallel

However, so far it's a single thread. You may want to write with `ʻawait to execute multiple CPU-turning processes at the same time. Also, even in the process involving I / O wait, it is rather troublesome to be aware of ```await every time read or write, so I think that it may be fun I will.

One way to do this is to use run_in_executor () to populate the process pool (or thread pool).

executor.py


import asyncio
import concurrent.futures

def func1():
    print("func1() started")
    s = sum(i for i in range(10 ** 7))
    print("func1() finished")
    return s

def func2():
    print("func2() started")
    s = sum(i * i for i in range(10 ** 7))
    print("func2() finished")
    return s

async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        task1 = loop.run_in_executor(pool, func1)
        task2 = loop.run_in_executor(pool, func2)
        result1 = await task1
        result2 = await task2
        print('result:', result1, result2)

asyncio.run(main())

If you do this, you will see that func1 () and func2 () are running at the same time, as shown below. [^ 3]

[^ 3]: Only when the number of logical cores is 2 or more. But unless you're using a cloud service like VPS or AWS these days, you'll almost certainly have more than two cores.

func1() started
func2() started
func1() finished
func2() finished
result: 49999995000000 333333283333335000000

In fact, you can use a mechanism called a "process pool" to run multiple jobs at the same time (the process pool itself has existed before `ʻasynciocame out). This is a method of securing some processes in advance and reusing them for parallel processing. Note that func1 () and func2 () do not have ```async. ** It's just an ordinary function. ** **

Until now, I used loop.run_in_executor () instead of using asyncio.create_task () ``. Until now, only one person, including `` main () ``, could work, and I couldn't start a new job unless I was given a turn. When asyncio.create_task () is executed, main () itself has control, so the input process will not start immediately. On the other hand, ** process pools allow you to run multiple jobs at the same time. ** Moreover, since it is a separate frame from main () , the process can be started immediately when ** loop.run_in_executor () `` is called. ** And you can do multiple jobs at the same time, as long as you have a reserved number of processes.

The default value of "Number of reserved processes" is the number of CPU processors (the number of logical cores including hyper-threading). As a test

with concurrent.futures.ProcessPoolExecutor() as pool:

This part

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool:

Please change it to. Func2 () will not start until func1 () ends.

Considering the troublesome mechanism so far, it has become quite convenient to use. ** You may want to use this (there are individual differences). ** Let's stop the tsukkomi that it is no longer io. </ s>

Illustrated

It looks like the following. Blue is main () is the default thread, and purple is the process running in the process pool. Apart from main (), purple processing can be executed simultaneously up to max_workers or the number of processors. image.png

It looks like this when max_workers = 1. Since there is only one purple process at a time, the next one will not work until task1 (func1 ()) is finished. image.png

Other tips

I want to get the return value of processing

As I used in the previous example of ʻexecutor.py``, the return value of a function executing asynchronously can be obtained as the return value of the ʻawait`` statement.

return_value.py


import asyncio

async def func1():
    await asyncio.sleep(1)
    return 12345

async def main():
    task1 = asyncio.create_task(func1())
    ret = await task1
    print(ret) # 12345

asyncio.run(main())

I want to perform multiple processes at the same time `ʻawait``

Use ʻasyncio.gather`` if you want to wait until all is done instead of waiting one by one. The previous example of ʻexecutor.py`` looks like this.

result1, result2 = await asyncio.gather(task1, task2)

time out

You can use ʻasyncio.wait_for () `to interrupt the process if it does not finish in a certain amount of time. In the following example, func1 () finished is not executed and the program ends.

timeout.py


import asyncio

async def func1():
    print("func1() started")
    await asyncio.sleep(10)
    print("func1() finished")

async def main():
    task1 = asyncio.create_task(func1())
    try:
        ret = await asyncio.wait_for(task1, 1)
        print(ret)
    except asyncio.TimeoutError:
        print("timeout...")

asyncio.run(main())

Finally

Let's get started by looking at the documentation when needed. asyncio --- Asynchronous I / O — Python 3.9.0 Documentation

Recommended Posts