I'm sure many people have heard of asynchronous programming. For example, the JavaScript used in the front end is a single-threaded language that does not block the main thread, so various functions are implemented to be asynchronous. Node.js also inherits that property and is good at I / O bound tasks. However, when it comes to Python, it supports parallel and parallel processing, so most people will not have experience using asynchronous programming in their own projects. Of course, Tornado, Twisted and [Gevent](http://www.gevent. Many people have used asynchronous frameworks such as org /) because they are famous, but when you run into strange errors, it's hard to solve.
As you can see from the recent trends in PyCon, asynchronous programming is arguably the next trend in the Python ecosystem. In addition, emerging programming languages such as Go and Rust sell asynchronous processing and high-performance concurrency. Since Python must not be defeated, Guido, the creator of Python, started developing the Tulip (asyncio) project himself in 2013.
First, I would like to explain the related concepts and then go into the explanation of asynchronous processing.
Blocking is everywhere. For example, the CPU is "[context switch](https://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%B3%E3%83%86%E3%82%AD%E3%" 82% B9% E3% 83% 88% E3% 82% B9% E3% 82% A4% E3% 83% 83% E3% 83% 81 #: ~: text =% E3% 82% B3% E3% 83% B3% E3% 83% 86% E3% 82% AD% E3% 82% B9% E3% 83% 88% E3% 82% B9% E3% 82% A4% E3% 83% 83% E3% 83% 81% 20 (context% 20switch)% 20% E3% 81% A8,% E4% B8% 8D% E5% 8F% AF% E6% AC% A0% E3% 81% AA% E6% A9% 9F% E8% 83% When doing "BD% E3% 81% A7% E3% 81% 82% E3% 82% 8B% E3% 80% 82)", other processes cannot process and are blocked. (In the case of multi-core, the core with context switching becomes unavailable)
Non-blocking is the opposite side of blocking, and blocking of a certain process reduces the calculation efficiency, so that process is made non-blocking.
The above communication method refers to "synchronization primitive" in asynchronous processing and parallel processing. For example, semaphores, locks, sync queues, and so on. These communication methods are for synchronizing multiple programs under certain conditions. And because there is asynchronous processing, these communication methods are necessary. The reason is that if all programs perform synchronous processing, they are processed in order from the beginning, so no communication method is required.
For parallel / parallel processing, please refer to Previous article.
The program should be multitasked to support concurrency. Blocking / non-blocking and synchronous / asynchronous are defined for each task. Therefore, parallel, asynchronous, and non-blocking are closely related.
Synchronous / asynchronous, blocking / non-blocking are not incompatible. For example, an EC site performs asynchronous processing for multi-user access requests, but inventory updates must be synchronous processing.
As a result, most asynchronous frameworks simplify the asynchronous programming model (allowing only one event at a time). Discussions about asynchronous programming are focused on single-threaded ones.
So if you adopt asynchronous programming, you have to make each asynchronous call smaller. "Small" here means shortening the calculation time. And how to divide a program into asynchronous tasks becomes a challenge.
As mentioned earlier, asynchronous programming has many drawbacks. "Asyncio", which the creator of Python created by himself over four years, became a standard library in Python 3.6. But why do we have to make it so hard? The reason is that asynchronous programming is very useful.
Assuming that the number of CPU clocks is 2.6Ghz
, that is, it is possible to process instructions of $ 2.6 \ times10 ^ 9 $ per second, and the time required for each instruction processing is 0.38ns
. 0.38ns
is the smallest unit in the sense of time of the CPU. And let the smallest unit of human sense of time be 1s
. The table below is a calculation of CPU time sensation based on human time sensation. Also, for CPU delay data, see "Latency numbers every programmer should know".
processing | Actual delay | CPU time sense |
---|---|---|
Instruction processing | 0.38ns | 1s |
Read L1 cache | 0.5ns | 1.3s |
Branch prediction correction | 5ns | 13s |
Read L2 cache | 7ns | 18.2s |
exclusion control | 25ns | 1m5s |
Memory reference | 100ns | 4m20s |
Context switch | 1.5µs | 1h5m |
Upload 2KB of data over a 1Gbps network | 20µs | 14.4h |
Read 1MB of continuous data from memory | 250µs | 7.5day |
Ping the host in the same internet data center (round trip) | 0.5ms | 15day |
Read 1MB of continuous data from SSD | 1ms | 1month |
Read 1MB of continuous data from the hard disk | 20ms | 20month |
Ping a host in a different prefecture (round trip) | 150ms | 12.5year |
Rebooting the virtual machine | 4s | 300year |
Reboot the server | 5m | 25000year |
The CPU is the processing core of a computer and is a valuable resource. If you waste CPU execution time and reduce utilization, your program will inevitably become less efficient. As the table above shows, uploading 2KB of data over a 1Gbps network is like spending 14 hours in the CPU sense. If it is a 10Mbps network, the efficiency will be 100 times lower. The act of just letting the CPU wait for this long time and not moving it to other processing is just a waste of the "youth" of the CPU.
If a program does not make good use of computer computing resources, it will need more computers to fill the gap. For example, redesigning a scraping program with asynchronous programming can reduce the originally required 7 servers to 3 and reduce costs by 57%. By the way, on AWS, a reserved instance of m4.xlarge costs about 150,000 yen per year.
If you don't care about money, you really care about efficiency. If you increase the number of servers to a certain number, you must improve the architecture and program design, or even if you increase it further, the performance may not improve. And the management cost will be overwhelmingly increased. When booking a PS5 or XBOX Series X, it's possible that the EC site gets a 503 error because it's an architectural issue rather than the number of servers.
"C10K Problem" was submitted in 1999 and how to do it 1Ghz It's like a challenge to be able to provide FTP service to 10,000 clients at the same time with a single server in a network environment of CPU, 2G memory and 1Gbps. Since 2010, due to the improvement of hardware performance, "C10M problem" has been submitted following C10K. The C10M problem is a problem that handles 1 million simultaneous accesses per second in a network environment with an 8-core CPU, 64G memory and 10Gbps.
Cost and efficiency are issues from the perspective of corporate management, and the C10K / C10M problem poses a technical challenge to hardware. If the C10K / C10M problem can be solved, the cost and efficiency problems will be solved at the same time.
The CPU is very fast to compute, but context switches, memory reads, hard disk reads, and network communications are very slow. In other words, away from the CPU, everything except the L1 cache is slow. A computer consists of five major devices, an input device, an output device, a storage device, a control device, and an arithmetic unit. The control device and the arithmetic unit are in the CPU, but the others are all I / O. Reading and writing memory, reading and writing hard disks, and reading and writing to network interface cards are all I / O. I / O is the biggest bottleneck.
Asynchronous programs can be more efficient, but the biggest bottleneck is I / O, so the solution is "asynchronous I / O". It is called 9D% 9E% E5% 90% 8C% E6% 9C% 9FIO).
The Internet is probably the largest program on the planet. And from the table of [CPU time sense](# 2-1-cpu time sense), you can see that network I / O is slower than hard disk I / O and is the biggest bottleneck. Various asynchronous frameworks are aimed at network I / O, as nothing is slower than network I / O, except for server restarts.
Let's take scraping as an example. Here, we will download 10 web pages from the net.
The easiest method is to download in order. It is executed in the order of establishment of socket
connection, request transmission, and response reception.
import socket
import time
def blocking_way():
sock = socket.socket()
# blocking
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
return response
def sync_way():
res = []
for i in range(10):
res.append(blocking_way())
return len(res)
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
start = time.time()
sync_way()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Execution result:
elapsed_time: 2.76[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.56[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.85[sec]
elapsed_time: 2.66[sec]
elapsed_time: 2.60[sec]
elapsed_time: 3.38[sec]
elapsed_time: 2.88[sec]
elapsed_time: 2.67[sec]
mean_elapsed_time: 2.75[sec]
The average time for 10 times is 2.75 seconds. The blocking_way ()
function establishes a socket
connection, sends an HTTP request, reads the HTTP response from the socket
, and returns the data. And the sync_way ()
function just repeated it 10 times. In the above code, sock.connect (('example.com', 80))
sends a request to server number 80, andsock.recv (4096)
uses 4KB of byte data from socket
. To read.
When a network connection is established is not determined by the client side, but by the network environment and the processing power of the server. And it is unpredictable when data will be returned from the server. Therefore, by default sock.connect ()
and sock.recv ()
are blocked. On the other hand, sock.send ()
is long and does not block. sock.send ()
returns the return value as soon as it copies the request data into the buffer of the TCP / IP protocol stack, so it doesn't wait for a response from the server.
If the network environment is very bad and it takes 1 second to establish a connection, sock.connect ()
will block for 1 second. This one second feels like 83 years for a 2.6GHz CPU. For the last 83 years, the CPU has been unable to do anything. Similarly, sock.recv ()
has to wait for the client to receive a response from the server. After downloading the example.com home page 10 times, repeat this blocking 10 times. But what about large-scale scraping, which downloads 10 million web pages a day?
In summary, network I / O, such as synchronous blocking, is very inefficient, especially among programs that communicate frequently. Such a method cannot solve the C10K / C10M.
If it takes time to run the same program 10 times, you can run 10 same programs at the same time. Therefore, we will introduce multi-process. By the way, in the OS before Linux 2.4, the process is the entity of the task, and the OS was designed to be process-oriented.
import socket
import time
from concurrent.futures import ProcessPoolExecutor
def blocking_way():
sock = socket.socket()
# blocking
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
return response
def multi_process_way():
with ProcessPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(blocking_way) for i in range(10)}
return len([future.result() for future in futures])
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
start = time.time()
multi_process_way()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Execution result:
elapsed_time: 0.49[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.48[sec]
elapsed_time: 0.49[sec]
elapsed_time: 0.54[sec]
elapsed_time: 0.51[sec]
elapsed_time: 0.56[sec]
elapsed_time: 0.52[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.47[sec]
mean_elapsed_time: 0.50[sec]
The average time for 10 times is 0.50 seconds. It was effective. However, the problem is that it is not one tenth of the synchronous blocking method. The reason is that the CPU of the execution environment is not 10 cores, and it is necessary to switch processes.
Switching processes is not as cheap as the CPU context switch shown in [CPU Time Sense](# 2-1-cpu Time Sense). When the CPU switches from one process to another, it first saves all the register and memory states of the original process runtime, and then restores the saved state of the other process. For the CPU, it's like waiting for a few hours. When the number of processes is larger than the number of CPU cores, it is always necessary to switch processes.
Besides switching, multi-process has another drawback. Since ordinary servers operate in a stable state, the number of processes that can be processed simultaneously is limited to tens to hundreds. Too many processes can cause system instability and run out of memory resources.
In addition to switching and small scale, multi-process has problems such as state and data sharing.
Thread data structures are lighter than processes and can have multiple threads within a process. OSs newer than Linux 2.4 have also changed the minimum scheduling unit from process to thread. The process just exists as a container for threads and now plays the role of managing resources. OS-level threads are distributed to each core of the CPU and can be executed concurrently.
import socket
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_way():
sock = socket.socket()
# blocking
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
return response
def multi_thread_way():
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(blocking_way) for i in range(10)}
return len([future.result() for future in futures])
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
start = time.time()
multi_thread_way()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Execution result:
elapsed_time: 0.31[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.27[sec]
mean_elapsed_time: 0.30[sec]
The average time for 10 times is 0.30 seconds. As expected, it was faster than multi-process. Multithreading seems to solve the problem of slow multi-process switching, and the scale of tasks that can be processed simultaneously has increased from hundreds to thousands of multi-processes.
But there is also a problem with multithreading. First, Python's multithreading is ["GIL"](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90% E3% 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% With the presence of 83% 83% E3% 82% AF), there is a problem that the advantage of multi-core CPU cannot be used. Only one thread is allowed to be active at any given time within a Python process. So why was multithreading faster than multiprocessing?
The reason is that when calling a blocking system call like sock.connect ()
, sock.recv ()
, the current thread releases the GIL, giving other threads a chance to execute. However, if you are in a single thread, blocking system calls will still be blocked.
Trivia: Python's
time.sleep
is a blocking process, but in multithreaded programming,time.sleep ()
does not block other threads.
Besides the GIL, there are some common problems with multithreading. Threads are scheduled to the OS, and their "scheduling strategy" is "preemption". % 82% A8% E3% 83% B3% E3% 83% 97% E3% 82% B7% E3% 83% A7% E3% 83% B3), threads of the same priority run with equal chances Is guaranteed to be. Preemption is a first-come-first-served strategy, so it is unpredictable which thread will be executed and which code will be executed next time, and ["race condition"](https://ja.wikipedia.org/ It can be wiki /% E7% AB% B6% E5% 90% 88% E7% 8A% B6% E6% 85% 8B).
For example, when a scraping worker thread polls the URL to be scraped next from the task queue, if multiple threads come to poll at the same time, the question is which one to pass. Therefore, locks and synchronization queues are needed to prevent the same task from being executed multiple times.
Also, multithreading can handle hundreds to thousands of multitasks simultaneously, but it's still inadequate for large, high-frequency web systems. Of course, the biggest problem with multithreading is still race conditions.
3-4. Asynchronous non-blocking method Finally, we have reached the non-blocking method. First, let's see how the most primitive non-blocking works.
import socket
import time
def nonblocking_way():
sock = socket.socket()
sock.setblocking(False)
#Because socket sends an error when sending a non-blocking connection request
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
data = request.encode('ascii')
#Repeat sending because socket cannot predict when a connection will be established
while 1:
try:
sock.send(data)
break
except OSError:
pass
response = b''
#Repeat reception because it is unpredictable when the response can be read
while 1:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
break
except OSError:
pass
return response
def sync_way():
res = []
for i in range(10):
res.append(nonblocking_way())
return len(res)
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
start = time.time()
sync_way()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Execution result:
elapsed_time: 2.71[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.73[sec]
elapsed_time: 2.69[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.72[sec]
elapsed_time: 2.51[sec]
elapsed_time: 2.65[sec]
elapsed_time: 2.75[sec]
elapsed_time: 3.50[sec]
mean_elapsed_time: 2.79[sec]
The average time for 10 times is 2.79 seconds. I feel deceived. The calculation time is the same level as synchronous blocking, but the code is more complicated. You may think that you don't need non-blocking.
First, in the above code, sock.setblocking (False)
instructed the OS to change the blocking system call for socket
to non-blocking. As mentioned earlier, non-blocking is when you do one thing, it doesn't prevent the program that called you from doing the other. The above code certainly no longer blocks after running sock.connect ()
and sock.recv ()
.
And the code is complicated because it no longer blocks. When connect ()
is called, the OS first raises an error, so you need to catch it with try
here. And here, without blocking, we immediately move on to the code below.
I repeat the while
statement and executesend ()
becauseconnect ()
becomes non-blocking and I don't know when the connection will be established, so I have to keep trying. And even if send ()
is executed, since it is not known when the response will come, the call to recv ()
is also executed repeatedly.
connect ()
and recv ()
no longer block the main program, but the CPU's spare time isn't being used effectively. I spent that time just repeating reading and writing of socket
and error handling in the while
loop.
And since 10 downloads are still executed in order, the total calculation time is the same as the synchronous blocking method.
3-5-1. epoll If the OS side checks whether each non-blocking call is ready, the application side does not have to wait or judge in a loop, so by allocating the free time to other processing You can improve efficiency.
Therefore, the OS encapsulated the I / O state change as an event. For example, readable events, writable events, and so on. It also provided a system module so that the application could receive event notifications. That module is select
. Through select
, the application ["File Descriptor "](https://ja.wikipedia.org/wiki/%E3%83%95%E3%82%A1%E3%82%A4%E3%83%" AB% E8% A8% 98% E8% BF% B0% E5% AD% 90) and callback functions can be registered. When the state of the file descriptor changes, select
calls the pre-registered callback function. This method is called I / O multiplexing.
select
was later improved as poll
due to the inefficient algorithm. In addition, the BSD kernel has been upgraded to the kqueue
module and the Linux kernel to the ʻepollmodule. The functionality of these four modules is the same, and the APIs available are about the same. The difference is that
kqueue and ʻepoll
are more efficient than the other two modules when processing large numbers of file descriptors.
Due to the widespread use of Linux servers, you often hear the ʻepollmodule. Assuming that the number of file descriptors is $ N $,
select ・ poll can be processed with the time complexity of $ O (N) $, while ʻepoll
can be processed with $ O (1) $. Also, ʻepoll` mounts all listening file descriptors with one special file descriptor. This file descriptor can be shared by multiplexers threads.
I left the listening of I / O events to the OS. When the I / O state changes (for example, when a socket connection is established and data can be sent), what should the OS do next? It's a ** callback **.
Here we need to encapsulate the sending and reading of data in separate functions. When ʻepoll listens to the
socket state on behalf of the application, call the HTTP request send function when ʻepoll
is ready to write the socket
state (connection is established). When the socket
state becomes readable (the client receives the response), call the response processing function."
Use ʻepoll` and callbacks to refactor the scraping code.
import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
class Crawler:
def __init__(self, path):
self.path = path
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('example.com', 80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
def connected(self, key, mask):
selector.unregister(key.fd)
get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
self.sock.send(get.encode('ascii'))
selector.register(key.fd, EVENT_READ, self.read_response)
def read_response(self, key, mask):
global stopped
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
paths_todo.remove(self.path)
if not paths_todo:
stopped = True
The difference is that you download 10 different pages instead of the same page. The set of page paths to download paths_todo
will be defined later. Let's take a look at the improvements here.
First, the two loops send ()
and recv ()
disappeared.
Then I introduced the selectors
module and created aDefaultSelector
instance. The Python standard library selectors
is an encapsulation of select / poll / epoll / kqueue
. DefaultSelector
will select the best module depending on the OS. All versions of Linux 2.5.44 and above are ʻepoll`.
I also registered a callback function to handle after the writable event ʻEVENT_WRITE and the readable event ʻEVENT_READ
of socket
have occurred.
The structure of the code has been cleaned up, and blocking notifications are left to the OS. However, to download 10 different pages, you need 10 Crawler
instances and 20 events will occur. How would you get the event that just occurred from selector
and execute the corresponding callback?
The only way to solve the above problem is to adopt the old method. In other words, it's a loop. Access the selector
module and wait for it to let you know which event has occurred and which callback to call. This waiting event notification loop is called an event loop (https://en.wikipedia.org/wiki/%E3%82%A4%E3%83%99%E3%83%B3%E3%83] % 88% E3% 83% AB% E3% 83% BC% E3% 83% 97 #: ~: text =% E3% 82% A4% E3% 83% 99% E3% 83% B3% E3% 83% 88 % E3% 83% AB% E3% 83% BC% E3% 83% 97% 20 (event% 20loop)% E3% 80% 81,% E3% 80% 81% E3% 81% 9D% E3% 82% 8C % E3% 82% 89% E3% 82% 92% E3% 83% 87% E3% 82% A3% E3% 82% B9% E3% 83% 91% E3% 83% 83% E3% 83% 81% EF % BC% 88% E9% 85% 8D% E9% 80% 81% EF% BC% 89).
def loop():
while not stopped:
#Block until some event occurs
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
In the code above, a global variable called stopped
controls when the event loop stops. When all paths_todo
s are consumed, change stopped
to True
.
And selector.select ()
is a blocking call. If the event does not occur here, the application has nothing to handle and must be blocked until the event occurs. As you can expect, when downloading only one web page, you will be able to do send ()
and recv ()
after connect ()
, so the processing efficiency is the same as the synchronous blocking method. .. The reason is that even if you don't block with connect ()
or recv ()
, you can block with select ()
.
So the selector
feature (hereafter called ʻepoll / kqueue) was designed to resolve large-scale parallel access. The
selector` function is at its best when there are a large number of non-blocking calls in the system and events can be generated almost randomly.
The code below created 10 download tasks and launched an event loop.
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
selector = DefaultSelector()
stopped = False
paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
start = time.time()
for path in paths_todo:
crawler = Crawler(path)
crawler.fetch()
loop()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Execution result:
elapsed_time: 0.29[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.24[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.34[sec]
mean_elapsed_time: 0.29[sec]
The average time for 10 times is 0.29 seconds. you're strong. Solved the problem of simultaneous download of 10 pages with "event loop + callback" in a single thread. This is asynchronous programming. There is a for
loop, it looks like it creates Crawler
instances in sequence and calls the fetch
method, but the fetch
method only handles connect ()
and event registration. is. And, in terms of execution time, the download tasks were clearly done at the same time.
Asynchronous processing procedure of the above code:
Crawler
instancefetch
method to create a socket
connection and register a writable event in the selector
fetch
has no blocking process, so return immediately occurs, call back its
connected` method and exit the first loop.connected
ʻEVENT_READ may occur, and ʻEVENT_WRITE
of other tasks may occur (at this time, the time blocked by one task is used to execute other tasks).We've seen everything from synchronous blocking to asynchronous non-blocking. And now you can do black magic to process multiple network I / O blocking tasks in parallel in a single thread. Compared to multithreading, you don't even have to switch threads. Callback execution is a function call that completes within the thread's stack. Moreover, the performance is also excellent. The number of tasks that can be processed simultaneously on a single server has grown to tens of thousands to hundreds of thousands.
This is the end of support for asynchronous programming for some programming languages. Engineers must spend a lot of time directly using ʻepoll` to register events and callbacks, manage event loops, and design callbacks.
As you can see from the contents so far, no matter what language you use, if you do asynchronous programming, you cannot escape from the above "event loop + callback" pattern. However, you may not be using ʻepoll, and it may not be a
while` loop. However, all of them are asynchronous methods of the model "I will teach you later".
So why don't you see callback patterns in some asynchronous programming? We will look at this in the future. You haven't talked about coroutines, which are the executives of Python's asynchronous programming.
From here, I'll explain how Python's asynchronous programming ecosystem inherited the "event loop + callback" pattern mentioned above, and then see how it evolved into a native coroutine pattern called ʻasyncio`. Let's do it.
In [3. Asynchronous I / O Evolution Path](# 3-Asynchronous io Evolution Path), we have seen the basic structure of "event loop + callback" that realizes asynchronous programming with a single thread. Certainly, "event loop + callback" can greatly improve the efficiency of the program. However, the problem has not been solved yet. The actual project is very complicated, so the following issues should be considered.
In a real project, these problems are inevitable. And the shortcomings of the callback pattern are hidden behind the problem.
def callback_1():
#processing
def callback_2():
#processing
def callback_3():
#processing
def callback_4():
#processing
def callback_5():
#processing
async_function(callback_5)
async_function(callback_4)
async_function(callback_3)
async_function(callback_2)
async_function(callback_1)
(Please forgive Lisp followers)
When writing code for synchronous programming, the relevant work is normally done from top to bottom.
do_a()
do_b()
If do_b ()
depends on the result ofdo_a ()
, anddo_a ()
is an asynchronous call, we don't know when the result ofdo_a ()
will be returned. Subsequent processing must be passed to do_a ()
in the form of a callback, which guarantees that do_a ()
will be completed before the do_b ()
is executed.
do_a(do_b())
And if all the long processing flows are asynchronous processing, it will be like this.
do_a(do_b(do_c(do_d(do_e(do_f(...))))))
The above style is called "Callback Hell style". However, the main problem is not the appearance, but the need to change the original top-to-bottom structure from outside to inside. First, do_a ()
, then do_b ()
, then do_c ()
, ..., and then to the innermost do_f ()
. In the synchronous process, do_b ()
after do_a ()
means that the instruction pointer of the thread controls the flow. However, when it comes to callback patterns, flow controls are carefully placed by engineers.
The sock
object in [3-1 Synchronous Version](# 3-1-Synchronous Blocking Method) is reused from top to bottom, while [3-5 Callback Version](# 3-5-Asynchronous non-blocking improvement) requires instantiating the Crawler
class and storing the sock
object in self
. Without an object-oriented programming style, you would have to pass the state you need to share to each callback function, much like a baton touch. And you need to plan ahead and carefully design which states should be shared between multiple asynchronous calls.
A series of callbacks makes up a chain of calls ("method chaining"). For example, there is a chain from do_a ()
to do_f ()
. What do you think if do_d ()
causes an error? The chain is broken and you lose the state of touching the baton. And ["Stack Trace"](https://ja.wikipedia.org/wiki/%E3%82%B9%E3%82%BF%E3%83%83%E3%82%AF%E3%83% 88% E3% 83% AC% E3% 83% BC% E3% 82% B9) will be destroyed. For example, do_d ()
causes an error, and the call to do_d ()
fails insidedo_c ()
, sodo_c ()
itself also causes an error. Similarly, both do_b ()
and do_a ()
will cause an error, and the error log will only report that "the call todo_a ()
has caused an error". However, it was do_d ()
that actually caused the error. To prevent this, we need to catch all the errors and return the data as the return value of the function. And all callback functions need to check the return value of the previous function, which prevents "Hiding Error".
So, while readability is just a matter of appearance, two drawbacks, such as breaking the stack trace and making it difficult to share and manage state, contribute to the extraordinary difficulty of callback-based asynchronous programming. Every programming language is trying to solve this problem. Thanks to that, "Promise", "Coroutine" A solution like / wiki /% E3% 82% B3% E3% 83% AB% E3% 83% BC% E3% 83% 81% E3% 83% B3) was born.
The "event loop + callback" pattern solved the difficulties of asynchronous programming such as "when the asynchronous task is completed" and "how to handle the return value of the asynchronous call". However, callbacks complicate the program. Before we can think of ways to avoid this shortcoming, we first need to clarify the essence. Why are callbacks mandatory? And what is the reason for sharing and managing the state, which is one of the drawbacks in the first place?
State sharing and management is necessary because programs need to know what they have done, what they are doing, and what they are going to do. In other words, the program needs to know its current state, and it also needs to baton-touch each callback and keep it in place.
Managing the state between multiple callbacks is difficult. So why not let each callback manage only its own state? The chain of calls makes the error handle difficult. So why not use a chain of calls? But if you don't use a chain of calls, how does the called function know if the previous function is complete? So what about letting a callback notify the next callback? In the first place, a callback can be viewed as a pending task.
Mutual notification between tasks, giving each task its own state, this is exactly the old programming style "cooperative multitasking". But you need to schedule it in a single thread. From here, you can have a stack frame and easily know your status ["Coroutine"](https://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%AB% E3% 83% BC% E3% 83% 81% E3% 83% B3 #: ~: text =% E3% 82% B3% E3% 83% AB% E3% 83% BC% E3% 83% 81% E3% 83% B3% EF% BC% 88% E8% 8B% B1% 3A% 20co% 2Droutine,% E5% 8B% 95% E4% BD% 9C% E3% 82% 92% E8% A1% 8C% E3% 81 % 86% E3% 81% 93% E3% 81% A8% E3% 81% AB% E3% 82% 88% E3% 82% 8B% E3% 80% 82) comes into play. Of course, it is also possible to notify each other between Koruchin.
Coroutines are ["subroutines"](https://ja.wikipedia.org/wiki/%E3%82%B5%E3%83%96%E3%83%AB%E3%83%BC%E3%83%81 It is a generalization of% E3% 83% B3). The coroutine scheduling strategy is ["Non-preemptive"](http://e-words.jp/w/%E3%83%8E%E3%83%B3%E3%83%97%E3%83%AA%E3 % 82% A8% E3% 83% B3% E3% 83% 97% E3% 83% 86% E3% 82% A3% E3% 83% 96% E3% 83% 9E% E3% 83% AB% E3% 83 % 81% E3% 82% BF% E3% 82% B9% E3% 82% AF.html #: ~: text =% E3% 83% 8E% E3% 83% B3% E3% 83% 97% E3% 83 % AA% E3% 82% A8% E3% 83% B3% E3% 83% 97% E3% 83% 86% E3% 82% A3% E3% 83% 96% E3% 83% 9E% E3% 83% AB % E3% 83% 81% E3% 82% BF% E3% 82% B9% E3% 82% AF% E3% 81% A8% E3% 81% AF% E3% 80% 81% E4% B8% 80% E3 % 81% A4% E3% 81% AE% E5% 87% A6% E7% 90% 86% E8% A3% 85% E7% BD% AE, CPU% E3% 82% 92% E7% AE% A1% E7 % 90% 86% E3% 81% 97% E3% 81% AA% E3% 81% 84% E6% 96% B9% E5% BC% 8F% E3% 80% 82), with multiple entries You can control suspend and resume.
Subroutines are callable code blocks defined by programming languages. In other words, it's a set of instructions packed to achieve a function. In general programming languages, subroutines are realized by structures such as functions and methods.
A special object in Python ["Generator"](https://ja.wikipedia.org/wiki/%E3%82%B8%E3%82%A7%E3%83%8D%E3%83%AC% E3% 83% BC% E3% 82% BF_ (% E3% 83% 97% E3% 83% AD% E3% 82% B0% E3% 83% A9% E3% 83% 9F% E3% 83% B3% E3 There is% 82% B0)). The generator's characteristics are similar to coroutines, it can be interrupted during an iteration and will not lose its previous state until the next iteration.
The generator has been enhanced in Python 2.5 (PEP 342) to enable simple coroutines in the generator. The proposed title for this enhancement is "Coroutines via Enhanced Generators". Thanks to PEP 342, the generator can now suspend execution at yield
and return data. You can also use send
to send data to the generator, and throw
to cause an error in the generator and terminate it.
Then refactor the scraping program with a generator-based coroutine.
How do you know the result of an asynchronous call if you quit the callback method? Here, we first define the object. When the result of the asynchronous call is returned, save it in it. This object is called the Future
object.
import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
The Future
object has a result
instance variable and stores future execution results. And the set_result
method sets the result
, and after binding the value to the result
, it executes the callback function pre-added to the Future
object. Callback functions can be added with the ʻadd_done_callback ()` method.
Didn't you say you're quitting the callback method? Don't panic. As explained in [Here](# 3-5-4-Summary), if you do asynchronous programming, you cannot escape from the "event loop + callback" pattern. And the callback here is a little different from the previous one.
Anyway, I created a Future
object that represents future data. Let's refactor the scraping code with the Future
object.
class Crawler:
def __init__(self, path):
self.path = path
self.response = b''
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
# fileno()The method returns the socket's file descriptor as a short integer
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield f
selector.unregister(sock.fileno())
get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(get.encode('ascii'))
global stopped
while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
#Receive the sent result
chunk = yield f
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
paths_todo.remove(self.path)
if not paths_todo:
stopped = True
break
Compared to the previous callback version, it makes a lot of difference. I used the yield
expression for the fetch
method and made it a generator. The generator can be started once with next ()
or send (None)
and will be interrupted when it reaches yield
. So how does the fetch
generator restart?
To solve the above problem, you need to follow one rule. It is the "Principle of Single Responsibility" (https://en.wikipedia.org/wiki/Single-responsibility_principle). Therefore, here we will create something that will play the role of restarting the generator and managing its state. Name it Task
.
class Task:
def __init__(self, coro):
#Coroutine object
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
#When sent, the generator will run until the next yield
# next_future is the object returned by yield
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
The Task
object in the above code packs a coro
coroutine object. Since the task to be managed is a pending task (coroutine), coro
here becomes a fetch
generator. And there is a step
method, which is executed once at initialization. The step
method calls the generator'ssend ()
method, which becomessend (None)
at initialization, so the initial iteration of coro
, that is, fetch ()
, is performed.
After send ()
is complete, you will get the next future
and use ʻadd_done_callback to add a callback function called
step () to the next
future`.
Next, let's take a look at the fetch ()
generator. Business logic such as sending a request and receiving a response is completed internally. And the callback function registered in selector
has also become simple. The two yield
s return the corresponding future
s and receive them in Task.step ()
. You have now successfully connected Task
, Future
, and Coroutine
.
Initializes the Task
object andfetch ()
runs until the first yield
. So how do you get it back?
The re-appearance of the event loop. When you reach the first yield, wait for the registered ʻEVENT_WRITE` to occur. The event loop, like a heartbeat, will continue to move once it begins to pulsate.
def loop():
while not stopped:
#Block until some event occurs
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
selector = DefaultSelector()
stopped = False
paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
start = time.time()
for path in paths_todo:
crawler = Crawler(path)
Task(crawler.fetch())
loop()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Execution result:
elapsed_time: 0.30[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.26[sec]
mean_elapsed_time: 0.27[sec]
The average time for 10 times is 0.27 seconds. This time loop
is a little different from before. callback ()
no longer receives ʻevent_key and event_mask. In other words, the callback here doesn't need to know who triggered the event, it can be seen in combination with
fetch (), but the callback just puts the value in
future with
set_result () . You just have to install it. And you don't have to know which
future it is, the coroutine can save its own state, and it's okay if you know your
future`. You don't have to worry about setting any value, the coroutine will handle it all for you.
Callback style:
Generator coroutine style:
selector
callback can be separated from the business logic simply by setting a value in future
.callback ()
in loop
no longer needs to know who triggered the eventsock
)The code is a little hard to read. Here, what should I do if I am asked to improve the defect resistance and functionality of fetch
? Also, technical logic (socket
related) and business logic (request and response processing) are mixed, which is not good.
socket
connection can be abstracted (function / method)socket.recv ()
inside the loopHowever, there is a yield
in these places, and if you want to abstract it, you need to make it a generator. Besides, fetch ()
itself is a generator, and playing around with the generator inside the generator can make the code even more complicated.
The Python designers also noticed this problem and provided a toy to play with the generator in a generator called yield from
.
yield from
is a grammar introduced in Python 3.3 (PEP 380). The PEP 380 is primarily intended to eliminate the inconvenience of playing with the generator inside the generator and has two functions.
One is the ability to yield from
directly without having to yield
the subgenerator by turning iterations. The following two types of generators are equivalent in function.
def gen_1():
sub_gen = range(10)
yield from sub_gen
def gen_2():
subgen = range(10)
for item in subgen:
yield item
The other is the ability to open intercommunication channels between the sub-generator and the main generator.
def gen():
yield from sub_gen()
def sub_gen():
while 1:
x = yield
yield x + 1
def main():
g = gen()
next(g) #Run until the first yield
retval = g.send(1) # gen()Seems to send data to, but actually sub_gen()Sending to
print(retval) # sub_gen()Outputs the calculated 2 from
g.throw(StopIteration) # sub_gen()Cause an error at
The above code shows the mutual communication function of yield from
. yield from
opens a communication channel inside gen ()
between sub_gen ()
and main ()
. Data 1
can be sent directly frommain ()
tosub_gen ()
, and the calculated value 2
can also be returned directly from sub_gen ()
to main ()
. .. You can also terminate sub_gen ()
by sending an error directly from main ()
to sub_gen ()
.
By the way, yield from
can be not only yield from <generator>
but also yield from <iterable>
.
Abstraction of socket
connection
def connect(sock, address):
f = Future()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from f
selector.unregister(sock.fileno())
Abstract response loading
def read(sock):
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from f
selector.unregister(sock.fileno())
return chunk
def read_all(sock):
response = []
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)
Crawler
refactoring
class Crawler:
def __init__(self, path):
self.path = path
self.response = b''
def fetch(self):
global stopped
sock = socket.socket()
yield from connect(sock, ('example.com', 80))
get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock)
paths_todo.remove(self.path)
if not paths_todo:
stopped = True
The code up to this point is fine. The reusable part is abstracted as a function. The value of the sub-generator can also be obtained with yield from
. However, one thing to note is that we are using yield from
instead of yield
when returning a future
object. yield
can be applied to ordinary Python objects, but not yield from
. Now we need to modify the Future
to make it a ʻiterable` object.
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
def __iter__(self):
yield self
return self.result
I just added __iter__
. Of course, you don't necessarily have to use yield from
, but you can leave it as yield
. However, it's better to use them differently to distinguish between generator-based coroutines or just generators. Therefore, after the introduction of yield from
from Python 3.3, it was deprecated to create coroutines with yield
. There is also the advantage of being able to freely send data between coroutines by using the mutual communication function of yield from
.
By improving the coroutine by yield from
, we were able to raise the level of abstraction of the code and further simplify the business logic. The intercommunication feature makes it easy to exchange data between coroutines. Python asynchronous programming is now a big step forward.
And the developers of the Python language also took full advantage of yield from
. The Python asynchronous programming framework Tulip
, led by Guido, has also evolved at a tremendous speed, changing its name to ʻasyncio` in Python 3.4 and temporarily adopting it as a standard library (on a provisional basis).
4-6. asyncio
ʻAsyncio is an asynchronous I / O framework ([PEP 3156](https://www.python.org/dev/peps/pep-3156/)) experimentally introduced from Python 3.4. ʻAsyncio
was provided in Python as an infrastructure for asynchronous I / O programming by Cortine. The core components consist of the Event Loop, Coroutine, Task, Future and other ancillary modules.
When ʻasyncio was introduced, a decorator called
@asyncio.coroutinewas also provided. You can mark it as a coroutine by attaching it to a function that uses
yield from`, but you are not forced to use it.
With the help of yield from
from Python 3.4, coroutines have become easier to create, but as with historical issues, people have a distinction and relationship between ** generators ** and ** coroutines **. I don't understand. And I don't know the difference between yield
andyield from
. This confusion violates the rules of "Python Zen".
So, starting with Python 3.5, Python designers hurriedly added the ʻasync / await grammar ([PEP 492](https://www.python.org/dev/peps/pep-0492/)). , Showed an explicit support for coroutines. This is called a ** native coroutine **. The two coroutine styles ʻasync / await
and yield from
have the same internal implementation and are compatible with each other.
And since Python 3.6, ʻasyncio` has officially joined the standard library. The above is the evolutionary trajectory of asynchronous I / O in CPython.
Experience the convenience of the ʻasyncio and ʻasync / await
grammars.
import asyncio
import aiohttp
import time
loop = asyncio.get_event_loop()
async def fetch(path):
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(path) as response:
response = await response.read()
return response
if __name__ == '__main__':
host = 'http://example.com'
paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
elapsed_times = 0
for _ in range(10):
start = time.time()
tasks = [fetch(host + path) for path in paths_todo]
loop.run_until_complete(asyncio.gather(*tasks))
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Execution result:
elapsed_time: 0.27[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
mean_elapsed_time: 0.26[sec]
Compared to the generator-based coroutine style, the ʻasyncio` style is quite different.
yield
and yield from
disappear and ʻasync / await` is used insteadloop ()
disappears and ʻasyncio.get_event_loop ()` is used insteadsocket
, ʻaio http` library will do it for youFuture
and Task
have disappeared, and ʻasyncio` has implemented it.The reason why you do not operate socket
by yourself to send HTTP requests and receive responses is that it is extremely difficult to handle the HTTP protocol well in actual business, and if you have an asynchronous HTTP client with full functionality, you can do it yourself. Because you don't have to do it.
Compared to the synchronous blocking version of the code:
We've taken a closer look at the evolution and mechanics of asynchronous programming in Python. In the end, we achieved N times more efficiency with code as simple as synchronous processing. And it doesn't have the drawbacks of callback hell.
More details on how to use ʻasyncio, its strengths and weaknesses, and how it distinguishes it from other asynchronous I / O solutions within the Python ecosystem, ʻasyncio
, will be discussed in another article.
A Web Crawler With asyncio Coroutines Latency numbers every programmer should know
Recommended Posts