Je suis sûr que beaucoup de gens ont entendu parler de la programmation asynchrone. Par exemple, le JavaScript utilisé dans le frontal est un langage à un seul thread qui ne bloque pas le thread principal, de sorte que diverses fonctions sont implémentées pour être asynchrones. Node.js hérite également de cette propriété et convient parfaitement aux tâches liées aux E / S. Cependant, en ce qui concerne Python, il prend en charge le traitement parallèle et parallèle, de sorte que la plupart des gens n'auront pas d'expérience en matière de programmation asynchrone dans leurs propres projets. Bien sûr, Tornado, Twisted et [Gevent](http: //www.gevent. Beaucoup de gens ont utilisé des frameworks asynchrones tels que org /) parce qu'ils sont célèbres, mais lorsque vous rencontrez des erreurs étranges, vous ne pourrez pas les résoudre.
Comme vous pouvez le voir d'après les tendances récentes de PyCon, la programmation asynchrone est sans doute la prochaine tendance de l'écosystème Python. En outre, les langages de programmation émergents tels que Go et Rust vendent un traitement asynchrone et un traitement parallèle hautes performances. Puisque Python ne doit pas être vaincu, Guido, le créateur de Python, a commencé à développer lui-même le projet Tulip (asyncio) en 2013.
Tout d'abord, je voudrais expliquer le concept associé, puis le traitement asynchrone.
Le blocage est partout. Par exemple, le processeur est "[Context Switch](https://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%B3%E3%83%86%E3%82%AD%E3%" 82% B9% E3% 83% 88% E3% 82% B9% E3% 82% A4% E3% 83% 83% E3% 83% 81 #: ~: texte =% E3% 82% B3% E3% 83% B3% E3% 83% 86% E3% 82% AD% E3% 82% B9% E3% 83% 88% E3% 82% B9% E3% 82% A4% E3% 83% 83% E3% 83% 81% 20 (contexte% 20 interrupteur)% 20% E3% 81% A8,% E4% B8% 8D% E5% 8F% AF% E6% AC% A0% E3% 81% AA% E6% A9% 9F% E8% 83% Lors de l'exécution de "BD% E3% 81% A7% E3% 81% 82% E3% 82% 8B% E3% 80% 82)", les autres processus ne peuvent pas traiter et sont bloqués. (Dans le cas du multi-core, le core avec le changement de contexte devient indisponible)
Le non-blocage est le côté opposé du blocage, et le blocage d'un certain processus réduit l'efficacité du calcul, de sorte que le processus est rendu non bloquant.
La méthode de communication ci-dessus fait référence à "primitive de synchronisation" dans le traitement asynchrone et le traitement parallèle. Par exemple, semapho, lock, file d'attente de synchronisation, etc. Ces méthodes de communication permettent de synchroniser plusieurs programmes sous certaines conditions. Et comme il existe un traitement asynchrone, ces méthodes de communication sont nécessaires. La raison en est que si tous les programmes effectuent un traitement synchrone, ils sont traités dans l'ordre depuis le début, aucune méthode de communication n'est donc requise.
Pour le traitement parallèle / parallèle, reportez-vous à Article précédent.
Le programme doit être divisé en plusieurs tâches pour prendre en charge la concurrence. Bloquant / non bloquant et synchrone / asynchrone sont définis pour chaque tâche. Par conséquent, parallèle, asynchrone et non bloquant sont étroitement liés.
Synchrone / asynchrone et bloquant / non bloquant ne sont pas incompatibles. Par exemple, un site EC effectue un traitement asynchrone pour les demandes d'accès multi-utilisateurs, mais les mises à jour d'inventaire doivent être un traitement synchrone.
En conséquence, la plupart des frameworks asynchrones simplifient le modèle de programmation asynchrone (n'autorisant qu'un seul événement à la fois). Les discussions sur la programmation asynchrone se concentrent sur celles à thread unique.
Par conséquent, si vous adoptez la programmation asynchrone, vous devez réduire la taille de chaque appel asynchrone. «Petit» signifie ici raccourcir le temps de calcul. Et comment diviser le programme en tâches asynchrones devient un défi.
Comme mentionné précédemment, la programmation asynchrone présente de nombreux inconvénients. "Asyncio", que le créateur de Python a créé par lui-même pendant quatre ans, est devenu une bibliothèque standard dans Python 3.6. Mais pourquoi devons-nous rendre les choses si difficiles? La raison en est que la programmation asynchrone est très utile.
En supposant que le nombre d'horloges du processeur est de «2,6 Ghz», c'est-à-dire qu'il est possible de traiter des instructions de 2,6 $ \ times10 ^ 9 $ par seconde, et le temps requis pour chaque traitement d'instruction est de «0,38 ns». «0.38ns» est la plus petite unité dans le sens du temps de la CPU. Et que la plus petite unité du sens humain du temps soit «1». Le tableau ci-dessous est un calcul du sens du temps du CPU basé sur le sens humain du temps. En outre, les données de retard CPU se réfèrent aux "Numéros de latence que tout programmeur devrait connaître".
En traitement | Délai réel | Sens du temps CPU |
---|---|---|
Traitement des instructions | 0.38ns | 1s |
Lire le cache L1 | 0.5ns | 1.3s |
Correction de la prédiction de branche | 5ns | 13s |
Lire le cache L2 | 7ns | 18.2s |
contrôle d'exclusion | 25ns | 1m5s |
Référence mémoire | 100ns | 4m20s |
Changement de contexte | 1.5µs | 1h5m |
Télécharger 2 Ko de données sur un réseau 1 Gbps | 20µs | 14.4h |
Lire 1 Mo de données continues de la mémoire | 250µs | 7.5day |
Ping l'hôte du même centre de données Internet (aller-retour) | 0.5ms | 15day |
Lire 1 Mo de données en continu à partir du SSD | 1ms | 1month |
Lire 1 Mo de données en continu sur le disque dur | 20ms | 20month |
Ping un hôte dans une autre préfecture (aller-retour) | 150ms | 12.5year |
Redémarrez la machine virtuelle | 4s | 300year |
Redémarrez le serveur | 5m | 25000year |
Le processeur est le cœur de traitement de l'ordinateur et constitue une ressource précieuse. Si vous perdez du temps d'exécution du processeur et réduisez l'utilisation, votre programme deviendra inévitablement moins efficace. Comme le montre le tableau ci-dessus, télécharger 2 Ko de données sur un réseau de 1 Gbit / s équivaut à passer 14 heures au sens du processeur. S'il s'agit d'un réseau à 10 Mbps, l'efficacité sera 100 fois inférieure. Le fait de simplement faire attendre le CPU pendant ce temps et de ne pas le transférer vers un autre traitement est juste un gaspillage de la «jeunesse» du CPU.
Si un programme ne fait pas bon usage des ressources informatiques, il aura besoin de plus d'ordinateurs pour combler le vide. Par exemple, la refonte d'un programme de scraping avec une programmation asynchrone peut réduire les 7 serveurs requis à l'origine à 3 et réduire les coûts de 57%. À propos, sur AWS, une instance réservée de m4.xlarge coûte environ 150 000 yens par an.
Si vous ne vous souciez pas de l'argent, vous vous souciez vraiment de l'efficacité. Si vous augmentez le nombre de serveurs à un certain nombre, vous devez améliorer l'architecture et la conception du programme, ou même si vous l'augmentez davantage, les performances risquent de ne pas s'améliorer. Et les coûts de gestion seront considérablement augmentés. Lors d'une réservation pour PS5 ou XBOX Series X, il est possible que le site EC donne une erreur 503 car il s'agit d'un problème architectural plutôt que du nombre de serveurs.
"Problème C10K" a été soumis en 1999 et comment le faire 1Ghz C'est comme un défi de pouvoir fournir un service FTP à 10000 clients en même temps avec un seul serveur dans un environnement réseau de CPU, de mémoire 2G et de 1 Gbps. Depuis 2010, en raison de l'amélioration des performances du matériel, le "problème C10M" a été soumis après C10K. Le problème C10M est un problème qui gère 1 million d'accès simultanés par seconde dans un environnement réseau avec 8 cœurs de processeur, 64 Go de mémoire et 10 Gbps.
Le coût et l'efficacité sont des problèmes du point de vue de la gestion d'entreprise, et le problème C10K / C10M pose un défi technique au matériel. Si le problème C10K / C10M peut être résolu, les problèmes de coût et d'efficacité seront résolus en même temps.
Le processeur est très rapide, mais le changement de contexte, la lecture de la mémoire, la lecture du disque dur et la communication réseau sont très lents. En d'autres termes, loin du CPU, tout sauf le cache L1 est lent. Un ordinateur se compose de cinq périphériques principaux: périphérique d'entrée, périphérique de sortie, périphérique de stockage, périphérique de contrôle et périphérique de calcul. Le périphérique de contrôle et le périphérique de calcul se trouvent dans le CPU, mais les autres sont tous des E / S. La lecture et l'écriture de la mémoire, la lecture et l'écriture du disque dur, ainsi que la lecture et l'écriture sur la carte d'interface réseau sont toutes des E / S. Les E / S sont le plus gros goulot d'étranglement.
Les programmes asynchrones peuvent être plus efficaces, mais le plus gros goulot d'étranglement est les E / S, donc la solution est "E / S asynchrones". Il s'appelle 9D% 9E% E5% 90% 8C% E6% 9C% 9FIO).
Le plus grand programme de la planète est probablement Internet. Et dans le tableau de [CPU time sense](# 2-1-cpu time sense), vous pouvez voir que les E / S réseau sont plus lentes que les E / S du disque dur et constituent le plus gros goulot d'étranglement. Divers frameworks asynchrones sont destinés aux E / S réseau, car rien n'est plus lent que les E / S réseau, à l'exception du redémarrage du serveur.
Prenons le grattage comme exemple. Ici, nous allons télécharger 10 pages Web sur le net.
La méthode la plus simple consiste à télécharger dans l'ordre. Il est exécuté dans l'ordre d'établissement de la connexion «socket», de la transmission de la demande et de la réception de la réponse.
import socket
import time
def blocking_way():
sock = socket.socket()
# blocking
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
return response
def sync_way():
res = []
for i in range(10):
res.append(blocking_way())
return len(res)
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
start = time.time()
sync_way()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Résultat de l'exécution:
elapsed_time: 2.76[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.56[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.85[sec]
elapsed_time: 2.66[sec]
elapsed_time: 2.60[sec]
elapsed_time: 3.38[sec]
elapsed_time: 2.88[sec]
elapsed_time: 2.67[sec]
mean_elapsed_time: 2.75[sec]
Le temps moyen pour 10 fois est de 2,75 secondes. La fonction blocking_way ()
établit une connexion socket
, envoie une requête HTTP, lit la réponse HTTP à partir de la socket
et renvoie les données. Et la fonction sync_way ()
vient de la répéter 10 fois. Dans le code ci-dessus, sock.connect (('example.com', 80))
envoie une requête au serveur numéro 80, et sock.recv (4096) ʻest 4 Ko de données d'octets de
socket`. Lire.
Lorsqu'une connexion réseau est établie n'est pas déterminée par le côté client, mais par l'environnement réseau et la puissance de traitement du serveur. Et il est imprévisible quand les données seront renvoyées par le serveur. Par conséquent, par défaut, «sock.connect ()» et «sock.recv ()» sont bloqués. D'autre part, sock.send ()
est long et ne bloque pas. sock.send ()
retourne la valeur de retour dès que les données de la requête sont copiées dans le tampon de la pile de protocoles TCP / IP, donc il n'attend pas une réponse du serveur.
Si l'environnement réseau est très mauvais et qu'il faut 1 seconde pour établir une connexion, sock.connect ()
bloquera pendant 1 seconde. Cette seconde ressemble à 83 ans pour un processeur à 2,6 GHz. Au cours des 83 dernières années, le CPU n'a rien pu faire. De même, sock.recv ()
doit attendre que le client reçoive une réponse du serveur. Après avoir téléchargé la page d'accueil example.com 10 fois, répétez ce blocage 10 fois. Mais qu'en est-il du scraping à grande échelle, qui télécharge 10 millions de pages Web par jour?
En résumé, les E / S réseau, telles que le blocage synchrone, sont très inefficaces, en particulier parmi les programmes qui communiquent fréquemment. Une telle méthode ne peut pas résoudre C10K / C10M.
S'il faut du temps pour exécuter le même programme 10 fois, vous pouvez exécuter le même programme 10 fois en même temps. Par conséquent, nous allons introduire un multi-processus. À propos, dans le système d'exploitation avant Linux 2.4, le processus est l'entité de la tâche, et le système d'exploitation a été conçu pour être orienté processus.
import socket
import time
from concurrent.futures import ProcessPoolExecutor
def blocking_way():
sock = socket.socket()
# blocking
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
return response
def multi_process_way():
with ProcessPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(blocking_way) for i in range(10)}
return len([future.result() for future in futures])
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
start = time.time()
multi_process_way()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Résultat de l'exécution:
elapsed_time: 0.49[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.48[sec]
elapsed_time: 0.49[sec]
elapsed_time: 0.54[sec]
elapsed_time: 0.51[sec]
elapsed_time: 0.56[sec]
elapsed_time: 0.52[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.47[sec]
mean_elapsed_time: 0.50[sec]
Le temps moyen pour 10 fois est de 0,50 seconde. C'était efficace. Cependant, le problème est qu'il ne s'agit pas d'un dixième de la méthode de blocage synchrone. La raison en est que le processeur de l'environnement d'exécution n'est pas de 10 cœurs et qu'il est nécessaire de changer de processus.
Les processus de commutation ne sont pas aussi bon marché que le changement de contexte du processeur indiqué dans [CPU Time Sense](# 2-1-cpu Time Sense). Lorsque le processeur passe d'un processus à un autre, il enregistre d'abord tous les états de registre et de mémoire de l'exécution du processus d'origine, puis restaure l'état enregistré de l'autre processus. Pour le CPU, c'est comme attendre quelques heures. Lorsque le nombre de processus est supérieur au nombre de cœurs de processeur, il est nécessaire de changer de processus.
Outre la commutation, le multi-processus présente un autre inconvénient. Comme un serveur normal fonctionne dans un état stable, le nombre de processus pouvant être traités simultanément est limité à des dizaines à des centaines. Trop de processus peuvent rendre votre système instable et manquer de ressources mémoire.
Et en plus de la commutation et à petite échelle, le multi-processus présente des problèmes tels que le partage d'état et de données.
La structure de données des threads est plus légère que celle des processus et vous pouvez avoir plusieurs threads dans un processus. Le système d'exploitation plus récent que Linux 2.4 a également changé l'unité minimum planifiable d'un processus à l'autre. Le processus existe simplement en tant que conteneur pour les threads et joue désormais le rôle de gestion des ressources. Les threads au niveau du système d'exploitation sont distribués à chaque cœur de la CPU et peuvent être exécutés simultanément.
import socket
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_way():
sock = socket.socket()
# blocking
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
return response
def multi_thread_way():
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(blocking_way) for i in range(10)}
return len([future.result() for future in futures])
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
start = time.time()
multi_thread_way()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Résultat de l'exécution:
elapsed_time: 0.31[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.27[sec]
mean_elapsed_time: 0.30[sec]
Le temps moyen pour 10 fois est de 0,30 seconde. Comme prévu, c'était plus rapide que le multi-processus. Le multithreading semble résoudre le problème de la lenteur de la commutation de processus multiprocessus, et l'échelle des tâches pouvant être traitées simultanément est passée de centaines à des milliers de multiprocessus.
Cependant, il existe également un problème avec le multithreading. Premièrement, le multithreading de Python est ["GIL"](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90% E3% 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% Avec la présence de 83% 83% E3% 82% AF), il y a un problème en ce que l'avantage du processeur multicœur ne peut pas être utilisé. Un seul thread est autorisé à être actif à un moment donné dans un processus Python. Alors, pourquoi le multithreading était-il plus rapide que le multiprocessus?
La raison en est que lors de l'appel d'un appel système bloquant comme sock.connect ()
, sock.recv ()
, le thread actuel libère le GIL, donnant aux autres threads une chance de s'exécuter. Cependant, si vous êtes dans un seul thread, l'appel système de blocage sera bloqué tel quel.
Connaissance des haricots: «Time.sleep» de Python est un processus de blocage, mais dans la programmation multithread, «time.sleep ()» ne bloque pas les autres threads.
Outre GIL, il existe des problèmes courants avec le multithreading. Les threads sont planifiés sur le système d'exploitation et leur "stratégie de planification" est ["préemption"](https://ja.wikipedia.org/wiki/%E3%83%97%E3%83%AA%E3 % 82% A8% E3% 83% B3% E3% 83% 97% E3% 82% B7% E3% 83% A7% E3% 83% B3), les threads de même priorité s'exécutent avec des chances égales Est garanti d'être. La préemption est une stratégie du premier arrivé, premier servi, il est donc imprévisible quel thread sera exécuté et quel code sera exécuté à la prochaine fois, et ["état de la concurrence"](https://ja.wikipedia.org/ Il peut s'agir de wiki /% E7% AB% B6% E5% 90% 88% E7% 8A% B6% E6% 85% 8B).
Par exemple, lorsqu'un thread de travail de scraping interroge l'URL suivante à extraire de la file d'attente des tâches, la question est de savoir laquelle passer si plusieurs threads viennent interroger en même temps. Par conséquent, des verrous et des files d'attente de synchronisation sont nécessaires pour empêcher la même tâche d'être exécutée plusieurs fois.
En outre, le multithreading peut gérer des centaines à des milliers de tâches multitâches simultanément, mais il est toujours insuffisant pour les systèmes Web importants et fréquents. Bien sûr, le plus gros problème avec le multithreading reste le conflit.
3-4. Méthode non bloquante asynchrone Enfin, nous avons atteint la méthode non bloquante. Voyons d'abord comment fonctionne le non-bloquant le plus primitif.
import socket
import time
def nonblocking_way():
sock = socket.socket()
sock.setblocking(False)
#Parce que socket envoie une erreur lors de l'envoi d'une demande de connexion non bloquante
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
data = request.encode('ascii')
#Répéter l'envoi car le socket ne peut pas prédire quand une connexion sera établie
while 1:
try:
sock.send(data)
break
except OSError:
pass
response = b''
#Répéter la réception car elle est imprévisible lorsque la réponse peut être lue
while 1:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
break
except OSError:
pass
return response
def sync_way():
res = []
for i in range(10):
res.append(nonblocking_way())
return len(res)
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
start = time.time()
sync_way()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Résultat de l'exécution:
elapsed_time: 2.71[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.73[sec]
elapsed_time: 2.69[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.72[sec]
elapsed_time: 2.51[sec]
elapsed_time: 2.65[sec]
elapsed_time: 2.75[sec]
elapsed_time: 3.50[sec]
mean_elapsed_time: 2.79[sec]
Le temps moyen pour 10 fois est de 2,79 secondes. Je me sens trompé. Le temps de calcul est du même niveau que le blocage synchrone, mais le code est plus compliqué. Vous pensez peut-être que vous n'avez pas besoin de non-blocage.
Premièrement, dans le code ci-dessus, sock.setblocking (False)
a demandé au système d'exploitation de changer l'appel système bloquant pour socket
en non bloquant. Comme mentionné précédemment, le non-blocage, c'est que lorsque vous faites une chose, cela n'empêche pas le programme qui vous a appelé de faire l'autre. Le code ci-dessus ne se bloque certainement plus après avoir exécuté sock.connect ()
et sock.recv ()
.
Et le code est compliqué car il ne bloque plus. Lorsque connect ()
est appelé, le système d'exploitation déclenche d'abord une erreur, vous devez donc l'attraper avec try
ici. Et ici, sans blocage, on passe immédiatement au code ci-dessous.
Je répète l'instruction while
et exécute send ()
carconnect ()
devient non bloquant et je ne sais pas quand la connexion sera établie, je dois donc continuer à essayer. Et même si send ()
est exécuté, comme on ne sait pas quand la réponse viendra, l'appel à recv ()
est également exécuté à plusieurs reprises.
connect ()
et recv ()
ne bloquent plus le programme principal, mais le temps CPU disponible n'est pas utilisé efficacement. J'ai passé ce temps à répéter la lecture et l'écriture de «socket» et à gérer les erreurs dans la boucle «while».
Et comme 10 téléchargements sont toujours exécutés dans l'ordre, le temps de calcul total est le même que celui de la méthode de blocage synchrone.
3-5-1. epoll Si le côté OS vérifie si chaque appel non bloquant est prêt, le côté application n'a pas à attendre ou à juger en boucle, donc en allouant le temps libre à d'autres traitements Vous pouvez améliorer l'efficacité.
Par conséquent, le système d'exploitation a encapsulé le changement d'état des E / S comme un événement. Par exemple, les événements lisibles, les événements inscriptibles, etc. Il a également fourni un module système permettant à l'application de recevoir des notifications d'événements. Ce module est «select». Via select
, l'application [" File Descriptor "](https://ja.wikipedia.org/wiki/%E3%83%95%E3%82%A1%E3%82%A4%E3%83%" AB% E8% A8% 98% E8% BF% B0% E5% AD% 90) et les fonctions de rappel peuvent être enregistrées. Lorsque l'état du descripteur de fichier change, select
appelle la fonction de rappel préenregistrée. Cette méthode est appelée multiplexage d'E / S.
«select» a été amélioré plus tard en tant que «poll» en raison de l'algorithme inefficace. De plus, le noyau BSD a été mis à niveau vers le module kqueue
et le noyau Linux a été mis à niveau vers le module ʻepoll. La fonctionnalité de ces quatre modules est la même et les API disponibles sont presque les mêmes. La différence est que
kqueue et ʻepoll
sont plus efficaces que les deux autres modules lors du traitement d'un grand nombre de descripteurs de fichiers.
Vous entendez souvent le module ʻepollà cause de l'utilisation répandue des serveurs Linux. En supposant que le nombre de descripteurs de fichier est $ N $,
select ・ poll peut être traité avec le montant de calcul du temps de $ O (N) $, tandis que ʻepoll
peut être traité avec $ O (1) $. De plus, ʻepoll` monte tous les descripteurs de fichier d'écoute avec un descripteur de fichier spécial. Ce descripteur de fichier peut être partagé par plusieurs threads.
J'ai laissé l'écoute des événements d'E / S au système d'exploitation. Lorsque l'état des E / S change (par exemple, lorsqu'une connexion socket est établie et que les données peuvent être envoyées), que doit faire le système d'exploitation ensuite? C'est un ** rappel **.
Ici, nous devons encapsuler l'envoi et la lecture des données dans des fonctions séparées. Lorsque ʻepoll écoute l'état
socketau nom de l'application, appelez la fonction d'envoi de requête HTTP lorsque l'état
socket est prêt à être écrit (la connexion est établie) dans ʻepoll
. Lorsque l'état de "socket" devient lisible (le client reçoit la réponse), appelez la fonction de traitement de la réponse. "
Utilisez ʻepoll` et les rappels pour refactoriser le code de scraping.
import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
class Crawler:
def __init__(self, path):
self.path = path
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('example.com', 80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
def connected(self, key, mask):
selector.unregister(key.fd)
get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
self.sock.send(get.encode('ascii'))
selector.register(key.fd, EVENT_READ, self.read_response)
def read_response(self, key, mask):
global stopped
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
paths_todo.remove(self.path)
if not paths_todo:
stopped = True
La différence est que vous téléchargez 10 pages différentes au lieu de la même page. L'ensemble des chemins de page pour télécharger path_todo
sera défini ultérieurement. Jetons un coup d'œil aux améliorations ici.
Tout d'abord, les deux boucles «send ()» et «recv ()» ont disparu.
Ensuite, nous avons introduit le module selectors
et créé une instanceDefaultSelector
. La bibliothèque standard Python selectors
est une encapsulation de select / poll / epoll / kqueue
. DefaultSelector
sélectionnera le meilleur module en fonction du système d'exploitation. Toutes les versions de Linux 2.5.44 et supérieures sont ʻepoll`.
J'ai également enregistré une fonction de rappel à gérer après l'événement inscriptible ʻEVENT_WRITE et l'événement lisible ʻEVENT_READ
de socket
se sont produits.
La structure du code a été nettoyée et les notifications de blocage sont laissées au système d'exploitation. Cependant, pour télécharger 10 pages différentes, vous avez besoin de 10 instances Crawler
et 20 événements se produiront. Comment obtiendriez-vous l'événement qui vient de se produire à partir de selector
et exécuteriez le rappel correspondant?
La seule façon de résoudre le problème ci-dessus est d'adopter l'ancienne méthode. En d'autres termes, c'est une boucle. Accédez au module selector
et attendez qu'il vous indique quel événement s'est produit et quel rappel appeler. Cette boucle de notification d'événement en attente s'appelle une boucle d'événement (https://ja.wikipedia.org/wiki/%E3%82%A4%E3%83%99%E3%83%B3%E3%83] % 88% E3% 83% AB% E3% 83% BC% E3% 83% 97 #: ~: texte =% E3% 82% A4% E3% 83% 99% E3% 83% B3% E3% 83% 88 % E3% 83% AB% E3% 83% BC% E3% 83% 97% 20 (événement% 20loop)% E3% 80% 81,% E3% 80% 81% E3% 81% 9D% E3% 82% 8C % E3% 82% 89% E3% 82% 92% E3% 83% 87% E3% 82% A3% E3% 82% B9% E3% 83% 91% E3% 83% 83% E3% 83% 81% EF % BC% 88% E9% 85% 8D% E9% 80% 81% EF% BC% 89).
def loop():
while not stopped:
#Bloquer jusqu'à ce qu'un événement se produise
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)
Dans le code ci-dessus, une variable globale appelée «arrêté» contrôle le moment où la boucle d'événements s'arrête. Lorsque tous les path_todo
s sont consommés, remplacezarrêt
par Vrai
.
Et selector.select ()
est un appel bloquant. Si l'événement ne se produit pas ici, l'application n'a rien à gérer et doit être bloquée jusqu'à ce que l'événement se produise. Comme vous pouvez vous y attendre, lors du téléchargement d'une seule page Web, vous pourrez faire send ()
et recv ()
après connect ()
, donc l'efficacité du traitement est la même que la méthode de blocage synchrone. .. La raison est que même si vous ne bloquez pas avec connect ()
ou recv ()
, cela bloquera avec select ()
.
Ainsi, la fonction selector
(ci-après appelée ʻepoll / kqueue) est conçue pour résoudre les accès parallèles à grande échelle. La fonction
selector` est à son meilleur lorsqu'il y a un grand nombre d'appels non bloquants dans le système et que les événements peuvent être générés presque aléatoirement.
Le code ci-dessous a créé 10 tâches de téléchargement et lancé une boucle d'événements.
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
selector = DefaultSelector()
stopped = False
paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
start = time.time()
for path in paths_todo:
crawler = Crawler(path)
crawler.fetch()
loop()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Résultat de l'exécution:
elapsed_time: 0.29[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.24[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.34[sec]
mean_elapsed_time: 0.29[sec]
Le temps moyen pour 10 fois est de 0,29 seconde. vous êtes fort. Résolution du problème du téléchargement simultané de 10 pages avec "boucle d'événement + rappel" dans un seul thread. Il s'agit de programmation asynchrone. Il y a une boucle for
, on dirait qu'elle crée des instances Crawler
en séquence et appelle la méthode fetch
, mais la méthode fetch
ne gère que connect ()
et l'enregistrement des événements. est. Et, en termes de temps d'exécution, les tâches de téléchargement ont clairement été effectuées en même temps.
Procédure de traitement asynchrone du code ci-dessus:
Crawler
fetch
pour créer une connexion socket
et enregistrer un événement inscriptible dans le selector
fetch
n'a pas de processus de blocage, alors revenez immédiatement se produit, rappelez sa méthode
connected` et quittez la première boucle.connecté
ʻEVENT_READ peut se produire, et ʻEVENT_WRITE
d'autres tâches peuvent se produire (à ce moment, le temps bloqué par une tâche est utilisé pour exécuter d'autres tâches).Nous avons vu de la méthode de blocage synchrone à la méthode non bloquante asynchrone. Et maintenant, vous pouvez faire de la magie noire pour traiter plusieurs tâches de blocage d'E / S réseau en parallèle dans un seul thread. Comparé au multi-threading, il ne change même pas de thread. L'exécution de rappel est un appel de fonction qui se termine dans la pile du thread. De plus, les performances sont également excellentes. Le nombre de tâches pouvant être traitées simultanément sur un seul serveur est passé de dizaines de milliers à des centaines de milliers.
C'est la fin de la prise en charge de la programmation asynchrone pour certains langages de programmation. Les ingénieurs doivent passer beaucoup de temps à utiliser directement ʻepoll` pour enregistrer les événements et les rappels, gérer les boucles d'événements et concevoir les rappels.
Comme vous pouvez le voir dans le contenu jusqu'à présent, quel que soit le langage que vous utilisez, si vous effectuez une programmation asynchrone, vous ne pouvez pas échapper au modèle "boucle d'événement + rappel" ci-dessus. Cependant, vous n'utilisez peut-être pas ʻepoll, et ce n'est peut-être pas une boucle
While`. Cependant, toutes sont des méthodes asynchrones du modèle "Je vous apprendrai plus tard".
Alors, pourquoi certaines programmations asynchrones ne voient-elles pas des modèles de rappel? Nous examinerons cela à l’avenir. Vous n'avez pas parlé de Corroutine, l'exécutif de la programmation asynchrone de Python.
De là, je vais expliquer comment l'écosystème de programmation asynchrone de Python a hérité du modèle "boucle d'événement + rappel" mentionné ci-dessus, puis voir comment il a évolué pour devenir le modèle de collout natif ʻasyncio`. Faisons le.
Dans [3. Asynchronous I / O Evolution Path](# 3-Asynchronous io Evolution Path), nous avons vu la structure de base de "boucle d'événement + rappel" qui réalise une programmation asynchrone avec un seul thread. Certes, "boucle d'événement + rappel" peut grandement améliorer l'efficacité du programme. Cependant, le problème n'est pas encore résolu. Le projet réel peut être très complexe, vous devez donc prendre en compte les problèmes suivants:
Dans un vrai projet, ces problèmes sont inévitables. Et derrière le problème se trouvent les lacunes du modèle de rappel.
def callback_1():
#En traitement
def callback_2():
#En traitement
def callback_3():
#En traitement
def callback_4():
#En traitement
def callback_5():
#En traitement
async_function(callback_5)
async_function(callback_4)
async_function(callback_3)
async_function(callback_2)
async_function(callback_1)
(Veuillez pardonner aux abonnés Lisp)
Lors de l'écriture d'un code de programmation synchrone, le traitement correspondant est normalement effectué de haut en bas.
do_a()
do_b()
Si do_b ()
dépend du résultat dedo_a ()
, et do_a () ʻest un appel asynchrone, nous ne savons pas quand le résultat de
do_a ()sera retourné. Le traitement suivant doit être passé à
do_a ()sous la forme d'un callback, ce qui garantit que
do_a ()sera terminé avant l'exécution de
do_b ()`.
do_a(do_b())
Et si tous les longs flux de traitement sont rendus asynchrones, il en sera ainsi.
do_a(do_b(do_c(do_d(do_e(do_f(...))))))
Le style ci-dessus est appelé "style Callback Hell". Cependant, le problème principal n'est pas l'apparence, mais la nécessité de changer la structure d'origine de haut en bas de l'extérieur vers l'intérieur. D'abord, do_a ()
, puis do_b ()
, puis do_c ()
, ..., et ensuite au plus profond do_f ()
. Dans le processus synchrone, do_b ()
après do_a ()
signifie que le pointeur d'instruction du thread contrôle le flux. Cependant, lorsqu'il s'agit de modèles de rappel, les contrôles de flux sont soigneusement placés par les ingénieurs.
L'objet sock
dans [3-1 Synchronous Version](# 3-1-Synchronous Blocking Method) est réutilisé de haut en bas, tandis que [3-5 Callback Version](# 3-5-Amélioration de la méthode non bloquante asynchrone) vous oblige à instancier la classe Crawler
et à stocker l'objet chaussette
dans self
. Sans un style de programmation orienté objet, vous auriez à passer l'état que vous devez partager à chaque fonction de rappel, comme une touche de bâton. Et vous devez planifier à l'avance et concevoir soigneusement les états à partager entre plusieurs appels asynchrones.
Une série de callbacks constitue une chaîne d'appels (“Method Chain”). Par exemple, il y a une chaîne de do_a ()
à do_f ()
. Que pensez-vous si do_d ()
provoque une erreur? La chaîne est cassée et vous perdez l'état de toucher le bâton. Et ["Stack Trace"](https://ja.wikipedia.org/wiki/%E3%82%B9%E3%82%BF%E3%83%83%E3%82%AF%E3%83% 88% E3% 83% AC% E3% 83% BC% E3% 82% B9) sera détruit. Par exemple, do_d ()
provoque une erreur, et l'appel à do_d ()
échoue dansdo_c ()
, doncdo_c () ʻ lui-même provoque également une erreur. De même,
do_b ()et
do_a () provoqueront une erreur, et le journal des erreurs signalera seulement que "l'appel à
do_a ()` a provoqué une erreur ". Cependant, c'est «do_d ()» qui a causé l'erreur. Pour éviter cela, vous devez détecter toutes les erreurs et renvoyer les données en tant que valeur de retour de la fonction. Et toutes les fonctions de rappel doivent vérifier la valeur de retour de la fonction précédente, ce qui empêche "Hiding Error".
Ainsi, alors que la lisibilité n'est qu'une question d'apparence, deux inconvénients, tels que la rupture des traces de pile et la difficulté à partager et à gérer l'état, contribuent à l'extraordinaire difficulté de la programmation asynchrone basée sur le rappel. Chaque langage de programmation essaie de résoudre ce problème. Merci à cela, "Promise", "Coroutine" Une solution telle que / wiki /% E3% 82% B3% E3% 83% AB% E3% 83% BC% E3% 83% 81% E3% 83% B3) est née.
Le modèle "boucle d'événement + rappel" a résolu les difficultés de la programmation asynchrone telles que "lorsque la tâche asynchrone est terminée" et "comment gérer la valeur de retour de l'appel asynchrone". Cependant, les rappels compliquent le programme. Avant que nous puissions penser aux moyens d'éviter cette lacune, nous devons d'abord clarifier les éléments essentiels. Pourquoi un rappel est-il obligatoire? Et quelle est la raison du partage et de la gestion de l'État, qui est l'un des inconvénients en premier lieu?
Le partage et la gestion de l'État sont nécessaires parce que le programme doit savoir ce qu'il a fait, ce qu'il fait et ce qu'il va faire. En d'autres termes, le programme a besoin de connaître son état actuel, et il doit également toucher à chaque rappel et le maintenir en place.
La gestion de l'état entre plusieurs rappels est difficile. Alors pourquoi ne pas laisser chaque callback gérer uniquement son propre état? La chaîne d'appels rend la gestion des erreurs difficile. Alors pourquoi ne pas utiliser une chaîne d'appels? Mais si vous n'utilisez pas de chaîne d'appels, comment la fonction appelée sait-elle si la fonction précédente est terminée? Alors qu'en est-il de laisser un rappel notifier le prochain rappel? En premier lieu, un rappel peut être considéré comme une tâche en attente.
Notification mutuelle entre les tâches, donnant à chaque tâche son propre état, c'est exactement l'ancienne pratique de programmation "Cooperative Multitasking". Mais vous devez le planifier dans un seul thread. De là, vous pouvez avoir un cadre de pile et connaître facilement votre statut ["Coroutine"](https://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%AB% E3% 83% BC% E3% 83% 81% E3% 83% B3 #: ~: texte =% E3% 82% B3% E3% 83% AB% E3% 83% BC% E3% 83% 81% E3% 83% B3% EF% BC% 88% E8% 8B% B1% 3A% 20co% 2Droutine,% E5% 8B% 95% E4% BD% 9C% E3% 82% 92% E8% A1% 8C% E3% 81 % 86% E3% 81% 93% E3% 81% A8% E3% 81% AB% E3% 82% 88% E3% 82% 8B% E3% 80% 82) entre en jeu. Bien sûr, il est également possible de se notifier entre Koruchin.
Corroutine est ["Substitute"](https://ja.wikipedia.org/wiki/%E3%82%B5%E3%83%96%E3%83%AB%E3%83%BC%E3%83%81 C'est une généralisation de% E3% 83% B3). La stratégie de planification Coroutine est ["Non préemptive"](http://e-words.jp/w/%E3%83%8E%E3%83%B3%E3%83%97%E3%83%AA%E3 % 82% A8% E3% 83% B3% E3% 83% 97% E3% 83% 86% E3% 82% A3% E3% 83% 96% E3% 83% 9E% E3% 83% AB% E3% 83 % 81% E3% 82% BF% E3% 82% B9% E3% 82% AF.html #: ~: texte =% E3% 83% 8E% E3% 83% B3% E3% 83% 97% E3% 83 % AA% E3% 82% A8% E3% 83% B3% E3% 83% 97% E3% 83% 86% E3% 82% A3% E3% 83% 96% E3% 83% 9E% E3% 83% AB % E3% 83% 81% E3% 82% BF% E3% 82% B9% E3% 82% AF% E3% 81% A8% E3% 81% AF% E3% 80% 81% E4% B8% 80% E3 % 81% A4% E3% 81% AE% E5% 87% A6% E7% 90% 86% E8% A3% 85% E7% BD% AE, CPU% E3% 82% 92% E7% AE% A1% E7 % 90% 86% E3% 81% 97% E3% 81% AA% E3% 81% 84% E6% 96% B9% E5% BC% 8F% E3% 80% 82), avec entrées multiples Vous pouvez contrôler la suspension et la reprise.
Un sous-programme est un bloc de code appelable défini par un langage de programmation. En d'autres termes, c'est un ensemble d'instructions emballées pour réaliser une fonction. Dans les langages de programmation généraux, les sous-programmes sont réalisés par des structures telles que des fonctions et des méthodes.
Objet spécial en Python ["Generator"](https://ja.wikipedia.org/wiki/%E3%82%B8%E3%82%A7%E3%83%8D%E3%83%AC% E3% 83% BC% E3% 82% BF_ (% E3% 83% 97% E3% 83% AD% E3% 82% B0% E3% 83% A9% E3% 83% 9F% E3% 83% B3% E3 Il y a% 82% B0)). Les caractéristiques du générateur sont similaires à celles du coroutum, il peut être interrompu lors d'une itération et ne perd pas son état précédent jusqu'à l'itération suivante.
Le générateur a été amélioré en Python 2.5 (PEP 342) pour permettre des collouts simples dans le générateur. Le titre proposé pour cette amélioration est «Coroutines via des générateurs améliorés». Grâce à PEP 342, le générateur peut maintenant suspendre l'exécution à «yield» et renvoyer des données. Vous pouvez également utiliser send
pour envoyer des données au générateur, et throw
pour provoquer une erreur dans le générateur et le terminer.
Refactorisez ensuite le programme de grattage avec un collout basé sur un générateur.
Comment connaissez-vous le résultat d'un appel asynchrone si vous quittez la méthode de rappel? Ici, nous définissons d'abord l'objet. Lorsque le résultat de l'appel asynchrone est renvoyé, enregistrez-le. Cet objet est appelé l'objet «Future».
import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
L'objet Future
a une variable d'instance result
et stocke les futurs résultats de l'exécution. Et la méthode set_result
définit le result
, et après avoir lié la valeur au result
, elle exécute la fonction de rappel pré-ajoutée à l'objet Future
. Les fonctions de rappel peuvent être ajoutées avec la méthode ʻadd_done_callback () `.
N'avez-vous pas dit que vous quittiez la méthode de rappel? Ne paniquez pas. Comme expliqué dans [Ici](# 3-5-4-Summary), si vous faites de la programmation asynchrone, vous ne pouvez pas échapper au modèle "boucle d'événement + rappel". Et le rappel ici est un peu différent du précédent.
Quoi qu'il en soit, j'ai créé un objet Future
qui représente les données futures. Refactorisons le code de scraping avec l'objet Future
.
class Crawler:
def __init__(self, path):
self.path = path
self.response = b''
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
# fileno()La méthode renvoie le descripteur de fichier de socket sous la forme d'un type entier court
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield f
selector.unregister(sock.fileno())
get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(get.encode('ascii'))
global stopped
while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
#Recevoir le résultat envoyé
chunk = yield f
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
paths_todo.remove(self.path)
if not paths_todo:
stopped = True
break
Par rapport à la version de rappel précédente, cela fait beaucoup de différence. J'ai utilisé l'expression yield
pour la méthode fetch
et en ai fait un générateur. Le générateur peut être démarré une fois avec next ()
ou avec send (None)
, et sera interrompu quand il atteindra yield
. Alors, comment le générateur «fetch» redémarre-t-il?
Vous devez suivre une règle pour résoudre le problème ci-dessus. C'est le "principe de responsabilité unique" (https://en.wikipedia.org/wiki/Single-responsibility_principle). Par conséquent, nous allons créer ici quelque chose qui jouera le rôle de redémarrer le générateur et de gérer son état. Nommez-le «Tâche».
class Task:
def __init__(self, coro):
#Objet Corroutine
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
#Une fois envoyé, le générateur fonctionnera jusqu'au prochain rendement
# next_future est l'objet retourné par yield
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
L'objet Task
dans le code ci-dessus contient un objet coroutine coro
. La tâche à gérer est une tâche en attente (coroutine), donc le coro
ici est le générateur fetch
. Et il y a une méthode step
, qui est exécutée une fois à l'initialisation. La méthode step
appelle la méthode send ()
du générateur, et à l'initialisation elle devientsend (None)
, donc l'itération initiale de coro
, c'est-à-direfetch ()
, est effectuée.
Une fois que send ()
est terminé, vous obtiendrez le prochain futur
et vous utiliserez ʻadd_done_callback pour ajouter une fonction de rappel appelée
step () au prochain
futur`.
Ensuite, jetons un œil au générateur fetch ()
. La logique métier telle que l'envoi d'une demande et la réception d'une réponse est effectuée en interne. Et la fonction de rappel enregistrée dans selector
est également devenue simple. Les deux «yield» retournent les «futurs» correspondants et les reçoivent dans «Task.step ()». Vous avez maintenant connecté avec succès Task
, Future
et Coroutine
.
Initialise l'objet Task
et fetch ()
s'exécute jusqu'au premier yield
. Alors, comment le récupérer?
La boucle d'événements est de retour. Lorsque vous atteignez le premier rendement, attendez que le ʻEVENT_WRITE` enregistré se produise. La boucle d'événement, comme un battement de cœur, continuera à se déplacer une fois qu'elle commence à pulser.
def loop():
while not stopped:
#Bloquer jusqu'à ce qu'un événement se produise
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()
if __name__ == '__main__':
elapsed_times = 0
for _ in range(10):
selector = DefaultSelector()
stopped = False
paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
start = time.time()
for path in paths_todo:
crawler = Crawler(path)
Task(crawler.fetch())
loop()
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Résultat de l'exécution:
elapsed_time: 0.30[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.26[sec]
mean_elapsed_time: 0.27[sec]
Le temps moyen pour 10 fois est de 0,27 seconde. Cette fois, la «boucle» est un peu différente d'avant. callback ()
ne reçoit plus ʻevent_key et event_mask. En d'autres termes, le rappel ici n'a pas besoin de savoir qui a déclenché l'événement, il peut être vu en combinaison avec
fetch (), mais le rappel définit simplement la valeur à
future avec
set_result ()`. Il vous suffit de l'installer. Et vous n'avez pas besoin de savoir de quel "futur" il s'agit, le collout peut sauvegarder son propre état, et ce n'est pas grave si vous connaissez votre "futur". Vous n'avez pas à vous soucier de définir une valeur, le collout gérera tout pour vous.
Style de rappel:
Style de collout du générateur:
selector
peut être séparé de la logique métier simplement en définissant une valeur dans future
.callback ()
in loop
n'a plus besoin de savoir qui a déclenché l'événementLe code est un peu difficile à lire. Ici, que dois-je faire si on me demande d'améliorer la résistance aux défauts et la fonctionnalité de «fetch»? De plus, la logique technique (liée à «socket») et la logique métier (traitement des demandes et des réponses) sont mélangées, ce qui n'est pas bon.
socket
peut être abstraite (fonction / méthode)socket.recv ()
dans la boucleCependant, il y a des yield
s ici, et si vous voulez les résumer, vous devez en faire des générateurs. De plus, fetch ()
lui-même est un générateur, et jouer avec le générateur à l'intérieur du générateur peut rendre le code encore plus compliqué.
Les concepteurs de Python ont également remarqué ce problème et ont fourni un jouet pour jouer avec le générateur dans un générateur appelé yield from
.
yield from
est une syntaxe introduite à partir de Python 3.3 (PEP 380). Le PEP 380 est principalement destiné à éliminer l'inconvénient de jouer avec le générateur à l'intérieur du générateur et a deux fonctions.
L'un d'eux est que vous n'avez pas à «céder» le sous-générateur en tournant l'itération, vous pouvez directement «céder» à partir de «. Les deux types de générateurs suivants sont équivalents en fonction.
def gen_1():
sub_gen = range(10)
yield from sub_gen
def gen_2():
subgen = range(10)
for item in subgen:
yield item
L'autre est la possibilité d'ouvrir des canaux d'intercommunication entre le sous-générateur et le générateur principal.
def gen():
yield from sub_gen()
def sub_gen():
while 1:
x = yield
yield x + 1
def main():
g = gen()
next(g) #Courez jusqu'au premier rendement
retval = g.send(1) # gen()Semble envoyer des données à, mais en fait sous_gen()Envoi à
print(retval) # sub_gen()Sort le 2 calculé à partir de
g.throw(StopIteration) # sub_gen()Causer une erreur à
Le code ci-dessus montre la fonction de communication mutuelle de «rendement de». yield from
ouvre un canal de communication dansgen ()
entresub_gen ()
etmain ()
. Les données «1» peuvent être envoyées directement de «main ()» à «sub_gen ()», et la valeur calculée «2» peut être renvoyée directement de «sub_gen ()» à «main ()». .. Vous pouvez également terminer sub_gen ()
en envoyant une erreur directement de main ()
à sub_gen ()
.
Au fait, «yield from» peut être non seulement «yield from
Connexion abstraite socket
def connect(sock, address):
f = Future()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from f
selector.unregister(sock.fileno())
Chargement de la réponse abstraite
def read(sock):
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from f
selector.unregister(sock.fileno())
return chunk
def read_all(sock):
response = []
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)
Refactorisation de «Crawler»
class Crawler:
def __init__(self, path):
self.path = path
self.response = b''
def fetch(self):
global stopped
sock = socket.socket()
yield from connect(sock, ('example.com', 80))
get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock)
paths_todo.remove(self.path)
if not paths_todo:
stopped = True
Le code jusqu'à ce point est bien. La partie réutilisable est abstraite en tant que fonction. La valeur du sous-générateur peut également être obtenue avec «rendement de». Cependant, une chose à noter est que nous utilisons «yield from» au lieu de «yield» lors du retour d'un objet «futur». yield
peut être appliqué aux objets Python ordinaires, mais pas yield from
. Maintenant, nous devons modifier le Future
pour en faire un objet ʻiterable`.
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
def __iter__(self):
yield self
return self.result
Je viens d'ajouter «iter». Bien sûr, vous ne devez pas nécessairement utiliser yield from
, mais vous pouvez le laisser comme yield
. Cependant, il est préférable de les utiliser correctement pour distinguer s'ils ont des collouts basés sur des générateurs ou simplement des générateurs. Par conséquent, après l'introduction de yield from
de Python 3.3, il était déconseillé de créer des collouts avec yield
. Il y a aussi l'avantage de pouvoir envoyer librement des données entre des collouts en utilisant la fonction de communication mutuelle de «rendement de».
En améliorant le collout par "yield from", nous avons pu augmenter le niveau d'abstraction du code et simplifier davantage les questions liées à la logique métier. La fonction d'intercommunication facilite l'échange de données entre les coroutines. Avec cela, la programmation asynchrone Python a fait de grands progrès.
Et les développeurs du langage Python ont également profité pleinement de yield from
. Le framework de programmation asynchrone Python Tulip
, dirigé par Guido, a également évolué à une vitesse énorme, changeant son nom en ʻasyncio` dans Python 3.4 et l'adoptant temporairement comme bibliothèque standard (à titre provisoire).
4-6. asyncio
ʻAsyncio est un framework d'E / S asynchrones ([PEP 3156](https://www.python.org/dev/peps/pep-3156/)) introduit expérimentalement à partir de Python 3.4. ʻAsyncio
a été fourni en Python comme infrastructure pour la programmation d'E / S asynchrones par Cortine. Les composants de base se composent de Event Loop, Coroutine, Task, Future et d'autres modules auxiliaires.
Lorsque ʻasyncio a été introduit, un décorateur appelé
@ asyncio.coroutinea également été fourni. Vous pouvez le marquer comme collout en l'attachant à une fonction qui utilise
yield from`, mais vous n'êtes pas obligé de l'utiliser.
Avec l'aide de yield from
de Python 3.4, il est plus facile de créer des corouties, mais comme pour les problèmes historiques, les gens ont une distinction et une relation entre ** générateurs ** et ** coroutines **. Je ne comprends pas. Et je ne connais pas la différence entre "yield" et "yield from". Cette confusion viole les règles de "Python Zen".
Ainsi, à partir de Python 3.5, les concepteurs de Python ont ajouté à la hâte la syntaxe ʻasync / await ([PEP 492](https://www.python.org/dev/peps/pep-0492/)). , A montré un support explicite pour Corroutine. C'est ce qu'on appelle un ** collout natif **. Les deux styles de collout ʻasync / await
et yield from
ont la même implémentation interne et sont compatibles l'un avec l'autre.
Et à partir de Python 3.6, ʻasyncio` a officiellement rejoint la bibliothèque standard. Ce qui précède est la trajectoire évolutive des E / S asynchrones dans CPython.
Découvrez la commodité de la syntaxe ʻasyncio et ʻasync / await
.
import asyncio
import aiohttp
import time
loop = asyncio.get_event_loop()
async def fetch(path):
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(path) as response:
response = await response.read()
return response
if __name__ == '__main__':
host = 'http://example.com'
paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
elapsed_times = 0
for _ in range(10):
start = time.time()
tasks = [fetch(host + path) for path in paths_todo]
loop.run_until_complete(asyncio.gather(*tasks))
elapsed_time = time.time() - start
elapsed_times += elapsed_time
print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")
Résultat de l'exécution:
elapsed_time: 0.27[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
mean_elapsed_time: 0.26[sec]
Comparé au style collout basé sur un générateur, le style ʻasyncio` est assez différent.
yield
et yield from
disparaissent et ʻasync / await` est utilisé à la placeloop ()
disparaît et ʻasyncio.get_event_loop () `est utilisée à la placesocket
, la bibliothèque ʻaio http` le fera pour vousLa raison pour laquelle vous n'utilisez pas «socket» par vous-même pour envoyer des requêtes HTTP et recevoir des réponses est qu'il est extrêmement difficile de bien gérer le protocole HTTP dans les affaires réelles, et si vous avez un client HTTP asynchrone avec toutes les fonctionnalités, vous pouvez le faire vous-même. Parce que tu n'es pas obligé.
Par rapport à la version de blocage synchrone du code:
Nous avons examiné de plus près l'évolution et la mécanique de la programmation asynchrone en Python. Au final, nous avons atteint N fois plus d'efficacité avec un code aussi simple qu'un traitement synchrone. Et il n'a pas les inconvénients de l'enfer du rappel.
Plus de détails sur la façon d'utiliser ʻasyncio, ses forces et faiblesses, et comment il le distingue des autres solutions d'E / S asynchrones au sein de l'écosystème Python, ʻasyncio
, seront discutés dans un autre article.
A Web Crawler With asyncio Coroutines Latency numbers every programmer should know
Recommended Posts