asyncio is a library for writing ** concurrency ** code using the ** async / await ** syntax.
asyncio is used as the basis for multiple asynchronous Python frameworks such as high performance networks and web servers, database connection libraries, and distributed task queues.
asyncio --- Asynchronous I / O — Python 3.9.0 Documentation
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
# Python 3.7+
asyncio.run(main())
What's this, isn't it like C #'s async / await ... I'm using Python, but I'm ashamed to say that I didn't know it before. Sometimes I write C # code, but every time I touch around async / await, I wanted this for Python too ... Asynchronous programming in C # | Microsoft Docs
threading.Thread
multiprocessing.Process
It was an image around. Concurrency — Python 3.9.0 documentation [https://docs.python.org/ja/3/library/concurrency.html) Will asyncio be an option in the future?
Hello World!
The sample at the beginning is the code found in the official document above. First, let's move this. As you can see in the code comments, it works with ** Python 3.7 and above **. No special preparation such as installation of additional libraries is required.
helloworld.py
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
# Python 3.7+
asyncio.run(main())
Terminal
$ python3 helloworld.py
Hello ...
... World!
Make sure there is a 1 second between Hello ... and ... World !. The points are as follows.
--Write the process equivalent to main ()
as a function with ```async (** coroutine **) --You can do `ʻawait
in a function with ʻasync`` --
ʻasyncio.run (main ()) Call `` only once
But I'm not happy with this alone, right? It's not asynchronous processing.
So what about this?
async_sleep1.py
import asyncio
async def func1():
print('func1() started')
await asyncio.sleep(1)
print('func1() finished')
async def func2():
print('func2() started')
await asyncio.sleep(1)
print('func2() finished')
async def main():
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
await task1
await task2
asyncio.run(main())
When I run it, func1 ()
and func2 ()
start almost at the same time, and after 1 second, func1 ()
and func2 ()
end almost at the same time. You can see that there is.
func1() started
func2() started
func1() finished
func2() finished
Roughly speaking, `ʻawait`` is an image of ** "give your turn to another person and wait for yourself" **. Here, the "other person's (work)" is the Task, which can be created by any of the following methods.
--Specify a predefined process such as ʻasyncio.sleep ()
--Wrapping the return value (coroutine object) of a function with
ʻasyncwith` ʻasyncio.create_task ()
There is a method called.
Here, simply writing func1 ()
does nothing, but by using it like `ʻasyncio.create_task (func1 ()) ,
func1 () `` works. You can put it on standby so that you can start it. Just saying "standby" does not mean that it will start working immediately (described later).
asyncio.sleep?
What is worrisome here is ʻawait asyncio.sleep (1)
`. I understand the meaning, but when it comes to "waiting for n seconds," time.sleep ()
is the standard. So what if I change it here?
async_sleep2.py
import asyncio
async def func1():
print('func1() started')
#await asyncio.sleep(1)
time.sleep(1)
print('func1() finished')
async def func2():
print('func2() started')
#await asyncio.sleep(1)
time.sleep(1)
print('func2() finished')
async def main():
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
await task1
await task2
asyncio.run(main())
Unfortunately, func2 ()
will not start until func1 ()
ends.
func1() started
func1() finished
func2() started
func2() finished
In order to run the two processes in parallel, it is necessary to stop the processes with ʻawait asyncio.sleep (1)
`.
coroutine asyncio.sleep(delay, result=None, *, loop=None)
delay Stops for a second. If result is provided, it will be returned to the caller when the coroutine completes. sleep () always suspends the current Task and allows other Tasks to run.
The point is, "Always suspend the current Task and allow other Tasks to run."
If you call ʻasyncio.sleep ()
`to wait, ** give up your turn if someone else's work comes in while you wait. ** In other words, ** Just letting the process take a long time does not allow other processes to come in. ** **
Another confusing example.
async_sleep3.py
import asyncio
import time
async def func1():
print('func1() started')
await asyncio.sleep(1)
print('func1() finished')
async def func2():
print('func2() started')
time.sleep(2)
print('func2() finished')
async def main():
task1 = asyncio.create_task(func1())
task2 = asyncio.create_task(func2())
await task1
await task2
asyncio.run(main())
func1 ()
uses ```asyncio.sleep () to wait, while
func2 () uses
time.sleep () . And
func2 () `` has a longer sleep time.
The result of this execution is ...
func1() started
func2() started
func2() finished
func1() finished
It will be like this. ** func1 ()
, which should only sleep for 1 second, somehow ends after func2 ()
. ** **
Now let's revisit the description in the previous document.
sleep () always suspends the current Task and allows other Tasks to run.
** You can give your turn to someone else with ʻasyncio.sleep ()
, but you can't interrupt voluntarily. ** If a new process comes in while sleeping with
ʻasyncio.sleep ()
, it will switch to that, but just because the sleep time is over, it is running (ʻasyncio.sleep ()
It does not interrupt the process (not in `) and return control.
In the previous example, after waiting for 1 second with func1 ()
, func2 ()
is still running, so we will not force it to take turns. Wait for func2 ()
to finish before proceeding. It's a law. [^ func2]
[^ func2]: Of course, if func2 ()
gives up the order with ```asyncio.sleep () , then
func1 () `` can start working.
Is it like this if you illustrate the processing of ʻasync_sleep1.py`` that can be processed in parallel well? The colored areas represent the amount of time you actually have control. In the following,
ʻasyncio.sleep ()
is abbreviated as` ʻaio.sleep () ``.
main ()
puts task1
and task2
on standby with create_task ()
. task1
and task2
do not start immediately and wait.,
main () gives up control and waits for
task1to finish.
task1`` receives control. task1
releases control by executing ```asyncio.sleep (1) and waits for 1 second.
task2`` receives control. task2
releases control by executing ```asyncio.sleep (1) `` and waits for 1 second. There is no processing that can be executed. task1
waits for 1 second and receives control. task1
is finished, main ()
regains control.,
main () gives up control and waits for
task2`` to finish. There is no processing that can be executed. task2
waits for 1 second and receives control. task2
is finished, main ()
regains control.main ()
ends.On the other hand, the process of `ʻasync_sleep3.py`` will be illustrated as follows.
If you use time.sleep ()
, task2
is still in control. Therefore, task1
cannot resume until task2
finishes, even though it waited for 1 second.
Well, around here we notice. ** It is not possible to use it to perform other processing in parallel while rotating the calculation on the CPU. ** ** Multiple processes are not running at the same time. ** This is actually a single thread. ** **
For example, if it is based on threading.Thread
, you can do the following. [^ 1]
[^ 1]: In this code, the two processes work in parallel (apparently), but the GIL does not reduce the total processing time. If you want to speed up with multi-core, multiprocessing.Process
is better. → I investigated GIL that you should know if you want to do parallel processing with Python --Qiita
example_threading.py
import threading
def func1():
print('func1() started')
x = 0
for i in range(1000000):
x += i
print('func1() finished:', x)
def func2():
print('func2() started')
x = 0
for i in range(1000000):
x += i * 2
print('func2() finished:', x)
def main():
thread1 = threading.Thread(target=func1)
thread2 = threading.Thread(target=func2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
main()
func1() started
func2() started
func1() finished: 499999500000
func2() finished: 999999000000
** io ** That's right. In other words, it can be said that it is a mechanism for (apparently) parallel processing, which is mainly processing waiting for I / O (input / output) from the beginning.
However, in order to do other processing in the meantime, it is necessary to wait using a dedicated function, and even time.sleep ()
cannot be used.
I think that I / O waiting that often appears is network transmission / reception, but inferring from the results so far, unlike ** threading.Thread
etc., write input / output code normally However, it is unlikely that it will be parallel processing, so how should we use it ...? ** **
What we can see so far is that in order to perform input / output asynchronously (apparently in parallel), it is necessary to ** "give the order to other processing while waiting for input / output" **. That is.
Therefore, there is a function in `ʻasynciothat handles the process of giving up the order to the I / O wait time, such as
sleep``.
Let's try the example of socket communication in the following document. Streams — Python 3.9.0 Documentation
stream.py
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
If you execute it with a URL attached to the command line argument as shown below, an HTTP header will be returned.
$ python3 stream.py https://www.google.com/
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Omitted below)
However, this alone is neither asynchronous nor anything, so it is not interesting, so let's run the processing that uses the CPU behind the scenes while waiting.
stream2.py
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
async def cpu_work():
x = 0
for i in range(1000000):
x += i
if i % 100000 == 0:
print(x)
await asyncio.sleep(0.01)
async def main(url):
task1 = asyncio.create_task(print_http_headers(url))
task2 = asyncio.create_task(cpu_work())
await task1
await task2
url = sys.argv[1]
asyncio.run(main(url))
As shown below, the calculation is performed behind the scenes until a response is returned from the server, when it is returned, the processing is performed, and when the processing is completed, the rest of the calculation is performed.
0
5000050000
20000100000
45000150000
80000200000
HTTP header> HTTP/1.0 200 OK
HTTP header> Content-Type: text/html; charset=ISO-8859-1
(Omitted)
125000250000
180000300000
245000350000
320000400000
405000450000
```open_connection () and
readline () give up control and give up to others until a response is returned. The
cpu_work () (
task2``) that was given the order uses the CPU to perform calculations, but periodically tries to give the order to another person. If no one comes, continue the calculation, if someone comes, give up the turn and wait until the other person's work is finished.
The point here is to write ```await asyncio.sleep (0.01) in
cpu_work () , and if you forget this, even if a response is returned from the server during calculation, it will be processed. I can't. <s> Ancient </ s> Reminiscent of
DoEvents () `` in Visual Basic 6.0. When doing heavy loop processing, if you do not call it in the loop, the window will freeze without response. If you don't come with a pin, ask the person at home. </ s>
However, so far it's a single thread. You may want to write with `ʻawait to execute multiple CPU-turning processes at the same time. Also, even in the process involving I / O wait, it is rather troublesome to be aware of ```await
every time read
or write
, so I think that it may be fun I will.
One way to do this is to use run_in_executor ()
to populate the process pool (or thread pool).
executor.py
import asyncio
import concurrent.futures
def func1():
print("func1() started")
s = sum(i for i in range(10 ** 7))
print("func1() finished")
return s
def func2():
print("func2() started")
s = sum(i * i for i in range(10 ** 7))
print("func2() finished")
return s
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
task1 = loop.run_in_executor(pool, func1)
task2 = loop.run_in_executor(pool, func2)
result1 = await task1
result2 = await task2
print('result:', result1, result2)
asyncio.run(main())
If you do this, you will see that func1 ()
and func2 ()
are running at the same time, as shown below. [^ 3]
[^ 3]: Only when the number of logical cores is 2 or more. But unless you're using a cloud service like VPS or AWS these days, you'll almost certainly have more than two cores.
func1() started
func2() started
func1() finished
func2() finished
result: 49999995000000 333333283333335000000
In fact, you can use a mechanism called a "process pool" to run multiple jobs at the same time (the process pool itself has existed before `ʻasynciocame out). This is a method of securing some processes in advance and reusing them for parallel processing. Note that
func1 () and
func2 () do not have ```async
. ** It's just an ordinary function. ** **
Until now, I used loop.run_in_executor ()
instead of using asyncio.create_task () ``. Until now, only one person, including `` main () ``, could work, and I couldn't start a new job unless I was given a turn. When
asyncio.create_task () is executed,
main () itself has control, so the input process will not start immediately. On the other hand, ** process pools allow you to run multiple jobs at the same time. ** Moreover, since it is a separate frame from
main () , the process can be started immediately when **
loop.run_in_executor () `` is called. ** And you can do multiple jobs at the same time, as long as you have a reserved number of processes.
The default value of "Number of reserved processes" is the number of CPU processors (the number of logical cores including hyper-threading). As a test
with concurrent.futures.ProcessPoolExecutor() as pool:
This part
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool:
Please change it to. Func2 ()
will not start until func1 ()
ends.
Considering the troublesome mechanism so far, it has become quite convenient to use. ** You may want to use this (there are individual differences). ** Let's stop the tsukkomi that it is no longer io. </ s>
It looks like the following. Blue is main ()
is the default thread, and purple is the process running in the process pool. Apart from main ()
, purple processing can be executed simultaneously up to max_workers
or the number of processors.
It looks like this when max_workers = 1
. Since there is only one purple process at a time, the next one will not work until task1
(func1 ()
) is finished.
As I used in the previous example of ʻexecutor.py``, the return value of a function executing asynchronously can be obtained as the return value of the
ʻawait`` statement.
return_value.py
import asyncio
async def func1():
await asyncio.sleep(1)
return 12345
async def main():
task1 = asyncio.create_task(func1())
ret = await task1
print(ret) # 12345
asyncio.run(main())
Use ʻasyncio.gather`` if you want to wait until all is done instead of waiting one by one. The previous example of
ʻexecutor.py`` looks like this.
result1, result2 = await asyncio.gather(task1, task2)
You can use ʻasyncio.wait_for ()
`to interrupt the process if it does not finish in a certain amount of time.
In the following example, func1 () finished
is not executed and the program ends.
timeout.py
import asyncio
async def func1():
print("func1() started")
await asyncio.sleep(10)
print("func1() finished")
async def main():
task1 = asyncio.create_task(func1())
try:
ret = await asyncio.wait_for(task1, 1)
print(ret)
except asyncio.TimeoutError:
print("timeout...")
asyncio.run(main())
Let's get started by looking at the documentation when needed. asyncio --- Asynchronous I / O — Python 3.9.0 Documentation
Recommended Posts