Compréhension complète de la programmation asynchrone Python

Programmation asynchrone

Je suis sûr que beaucoup de gens ont entendu parler de la programmation asynchrone. Par exemple, le JavaScript utilisé dans le frontal est un langage à un seul thread qui ne bloque pas le thread principal, de sorte que diverses fonctions sont implémentées pour être asynchrones. Node.js hérite également de cette propriété et convient parfaitement aux tâches liées aux E / S. Cependant, en ce qui concerne Python, il prend en charge le traitement parallèle et parallèle, de sorte que la plupart des gens n'auront pas d'expérience en matière de programmation asynchrone dans leurs propres projets. Bien sûr, Tornado, Twisted et [Gevent](http: //www.gevent. Beaucoup de gens ont utilisé des frameworks asynchrones tels que org /) parce qu'ils sont célèbres, mais lorsque vous rencontrez des erreurs étranges, vous ne pourrez pas les résoudre.

Comme vous pouvez le voir d'après les tendances récentes de PyCon, la programmation asynchrone est sans doute la prochaine tendance de l'écosystème Python. En outre, les langages de programmation émergents tels que Go et Rust vendent un traitement asynchrone et un traitement parallèle hautes performances. Puisque Python ne doit pas être vaincu, Guido, le créateur de Python, a commencé à développer lui-même le projet Tulip (asyncio) en 2013.

1. Qu'est-ce que le traitement asynchrone?

Tout d'abord, je voudrais expliquer le concept associé, puis le traitement asynchrone.

1-1. Blocage

Le blocage est partout. Par exemple, le processeur est "[Context Switch](https://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%B3%E3%83%86%E3%82%AD%E3%" 82% B9% E3% 83% 88% E3% 82% B9% E3% 82% A4% E3% 83% 83% E3% 83% 81 #: ~: texte =% E3% 82% B3% E3% 83% B3% E3% 83% 86% E3% 82% AD% E3% 82% B9% E3% 83% 88% E3% 82% B9% E3% 82% A4% E3% 83% 83% E3% 83% 81% 20 (contexte% 20 interrupteur)% 20% E3% 81% A8,% E4% B8% 8D% E5% 8F% AF% E6% AC% A0% E3% 81% AA% E6% A9% 9F% E8% 83% Lors de l'exécution de "BD% E3% 81% A7% E3% 81% 82% E3% 82% 8B% E3% 80% 82)", les autres processus ne peuvent pas traiter et sont bloqués. (Dans le cas du multi-core, le core avec le changement de contexte devient indisponible)

1-2. Non bloquant

Le non-blocage est le côté opposé du blocage, et le blocage d'un certain processus réduit l'efficacité du calcul, de sorte que le processus est rendu non bloquant.

1-3. Traitement synchrone

1-4. Traitement asynchrone

La méthode de communication ci-dessus fait référence à "primitive de synchronisation" dans le traitement asynchrone et le traitement parallèle. Par exemple, semapho, lock, file d'attente de synchronisation, etc. Ces méthodes de communication permettent de synchroniser plusieurs programmes sous certaines conditions. Et comme il existe un traitement asynchrone, ces méthodes de communication sont nécessaires. La raison en est que si tous les programmes effectuent un traitement synchrone, ils sont traités dans l'ordre depuis le début, aucune méthode de communication n'est donc requise.

1-5. Traitement parallèle

1-6. Traitement parallèle

Pour le traitement parallèle / parallèle, reportez-vous à Article précédent.

1-7. Résumé du concept

Le programme doit être divisé en plusieurs tâches pour prendre en charge la concurrence. Bloquant / non bloquant et synchrone / asynchrone sont définis pour chaque tâche. Par conséquent, parallèle, asynchrone et non bloquant sont étroitement liés.

1-8. Programmation asynchrone

Synchrone / asynchrone et bloquant / non bloquant ne sont pas incompatibles. Par exemple, un site EC effectue un traitement asynchrone pour les demandes d'accès multi-utilisateurs, mais les mises à jour d'inventaire doivent être un traitement synchrone.

1-9. Difficultés de la programmation asynchrone

En conséquence, la plupart des frameworks asynchrones simplifient le modèle de programmation asynchrone (n'autorisant qu'un seul événement à la fois). Les discussions sur la programmation asynchrone se concentrent sur celles à thread unique.

Par conséquent, si vous adoptez la programmation asynchrone, vous devez réduire la taille de chaque appel asynchrone. «Petit» signifie ici raccourcir le temps de calcul. Et comment diviser le programme en tâches asynchrones devient un défi.

2. Raisons de l'adoption de la programmation asynchrone

Comme mentionné précédemment, la programmation asynchrone présente de nombreux inconvénients. "Asyncio", que le créateur de Python a créé par lui-même pendant quatre ans, est devenu une bibliothèque standard dans Python 3.6. Mais pourquoi devons-nous rendre les choses si difficiles? La raison en est que la programmation asynchrone est très utile.

2-1. Détection du temps CPU

En supposant que le nombre d'horloges du processeur est de «2,6 Ghz», c'est-à-dire qu'il est possible de traiter des instructions de 2,6 $ \ times10 ^ 9 $ par seconde, et le temps requis pour chaque traitement d'instruction est de «0,38 ns». «0.38ns» est la plus petite unité dans le sens du temps de la CPU. Et que la plus petite unité du sens humain du temps soit «1». Le tableau ci-dessous est un calcul du sens du temps du CPU basé sur le sens humain du temps. En outre, les données de retard CPU se réfèrent aux "Numéros de latence que tout programmeur devrait connaître".

En traitement Délai réel Sens du temps CPU
Traitement des instructions 0.38ns 1s
Lire le cache L1 0.5ns 1.3s
Correction de la prédiction de branche 5ns 13s
Lire le cache L2 7ns 18.2s
contrôle d'exclusion 25ns 1m5s
Référence mémoire 100ns 4m20s
Changement de contexte 1.5µs 1h5m
Télécharger 2 Ko de données sur un réseau 1 Gbps 20µs 14.4h
Lire 1 Mo de données continues de la mémoire 250µs 7.5day
Ping l'hôte du même centre de données Internet (aller-retour) 0.5ms 15day
Lire 1 Mo de données en continu à partir du SSD 1ms 1month
Lire 1 Mo de données en continu sur le disque dur 20ms 20month
Ping un hôte dans une autre préfecture (aller-retour) 150ms 12.5year
Redémarrez la machine virtuelle 4s 300year
Redémarrez le serveur 5m 25000year

Le processeur est le cœur de traitement de l'ordinateur et constitue une ressource précieuse. Si vous perdez du temps d'exécution du processeur et réduisez l'utilisation, votre programme deviendra inévitablement moins efficace. Comme le montre le tableau ci-dessus, télécharger 2 Ko de données sur un réseau de 1 Gbit / s équivaut à passer 14 heures au sens du processeur. S'il s'agit d'un réseau à 10 Mbps, l'efficacité sera 100 fois inférieure. Le fait de simplement faire attendre le CPU pendant ce temps et de ne pas le transférer vers un autre traitement est juste un gaspillage de la «jeunesse» du CPU.

2-2. De vrais problèmes

Si un programme ne fait pas bon usage des ressources informatiques, il aura besoin de plus d'ordinateurs pour combler le vide. Par exemple, la refonte d'un programme de scraping avec une programmation asynchrone peut réduire les 7 serveurs requis à l'origine à 3 et réduire les coûts de 57%. À propos, sur AWS, une instance réservée de m4.xlarge coûte environ 150 000 yens par an.

Si vous ne vous souciez pas de l'argent, vous vous souciez vraiment de l'efficacité. Si vous augmentez le nombre de serveurs à un certain nombre, vous devez améliorer l'architecture et la conception du programme, ou même si vous l'augmentez davantage, les performances risquent de ne pas s'améliorer. Et les coûts de gestion seront considérablement augmentés. Lors d'une réservation pour PS5 ou XBOX Series X, il est possible que le site EC donne une erreur 503 car il s'agit d'un problème architectural plutôt que du nombre de serveurs.

"Problème C10K" a été soumis en 1999 et comment le faire 1Ghz C'est comme un défi de pouvoir fournir un service FTP à 10000 clients en même temps avec un seul serveur dans un environnement réseau de CPU, de mémoire 2G et de 1 Gbps. Depuis 2010, en raison de l'amélioration des performances du matériel, le "problème C10M" a été soumis après C10K. Le problème C10M est un problème qui gère 1 million d'accès simultanés par seconde dans un environnement réseau avec 8 cœurs de processeur, 64 Go de mémoire et 10 Gbps.

Le coût et l'efficacité sont des problèmes du point de vue de la gestion d'entreprise, et le problème C10K / C10M pose un défi technique au matériel. Si le problème C10K / C10M peut être résolu, les problèmes de coût et d'efficacité seront résolus en même temps.

2-3. Solution

Le processeur est très rapide, mais le changement de contexte, la lecture de la mémoire, la lecture du disque dur et la communication réseau sont très lents. En d'autres termes, loin du CPU, tout sauf le cache L1 est lent. Un ordinateur se compose de cinq périphériques principaux: périphérique d'entrée, périphérique de sortie, périphérique de stockage, périphérique de contrôle et périphérique de calcul. Le périphérique de contrôle et le périphérique de calcul se trouvent dans le CPU, mais les autres sont tous des E / S. La lecture et l'écriture de la mémoire, la lecture et l'écriture du disque dur, ainsi que la lecture et l'écriture sur la carte d'interface réseau sont toutes des E / S. Les E / S sont le plus gros goulot d'étranglement.

Les programmes asynchrones peuvent être plus efficaces, mais le plus gros goulot d'étranglement est les E / S, donc la solution est "E / S asynchrones". Il s'appelle 9D% 9E% E5% 90% 8C% E6% 9C% 9FIO).

3. Le chemin de l'évolution des E / S asynchrones

Le plus grand programme de la planète est probablement Internet. Et dans le tableau de [CPU time sense](# 2-1-cpu time sense), vous pouvez voir que les E / S réseau sont plus lentes que les E / S du disque dur et constituent le plus gros goulot d'étranglement. Divers frameworks asynchrones sont destinés aux E / S réseau, car rien n'est plus lent que les E / S réseau, à l'exception du redémarrage du serveur.

Prenons le grattage comme exemple. Ici, nous allons télécharger 10 pages Web sur le net.

3-1. Méthode de blocage synchrone

La méthode la plus simple consiste à télécharger dans l'ordre. Il est exécuté dans l'ordre d'établissement de la connexion «socket», de la transmission de la demande et de la réception de la réponse.

import socket
import time


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def sync_way():
    res = []
    for i in range(10):
        res.append(blocking_way())
    return len(res)


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        sync_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Résultat de l'exécution:

elapsed_time: 2.76[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.56[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.85[sec]
elapsed_time: 2.66[sec]
elapsed_time: 2.60[sec]
elapsed_time: 3.38[sec]
elapsed_time: 2.88[sec]
elapsed_time: 2.67[sec]
mean_elapsed_time: 2.75[sec]

Le temps moyen pour 10 fois est de 2,75 secondes. La fonction blocking_way () établit une connexion socket, envoie une requête HTTP, lit la réponse HTTP à partir de la socket et renvoie les données. Et la fonction sync_way () vient de la répéter 10 fois. Dans le code ci-dessus, sock.connect (('example.com', 80)) envoie une requête au serveur numéro 80, et sock.recv (4096) ʻest 4 Ko de données d'octets de socket`. Lire.

Lorsqu'une connexion réseau est établie n'est pas déterminée par le côté client, mais par l'environnement réseau et la puissance de traitement du serveur. Et il est imprévisible quand les données seront renvoyées par le serveur. Par conséquent, par défaut, «sock.connect ()» et «sock.recv ()» sont bloqués. D'autre part, sock.send () est long et ne bloque pas. sock.send () retourne la valeur de retour dès que les données de la requête sont copiées dans le tampon de la pile de protocoles TCP / IP, donc il n'attend pas une réponse du serveur.

Si l'environnement réseau est très mauvais et qu'il faut 1 seconde pour établir une connexion, sock.connect () bloquera pendant 1 seconde. Cette seconde ressemble à 83 ans pour un processeur à 2,6 GHz. Au cours des 83 dernières années, le CPU n'a rien pu faire. De même, sock.recv () doit attendre que le client reçoive une réponse du serveur. Après avoir téléchargé la page d'accueil example.com 10 fois, répétez ce blocage 10 fois. Mais qu'en est-il du scraping à grande échelle, qui télécharge 10 millions de pages Web par jour?

En résumé, les E / S réseau, telles que le blocage synchrone, sont très inefficaces, en particulier parmi les programmes qui communiquent fréquemment. Une telle méthode ne peut pas résoudre C10K / C10M.

3-2. Amélioration de la méthode de blocage synchrone: méthode multi-processus

S'il faut du temps pour exécuter le même programme 10 fois, vous pouvez exécuter le même programme 10 fois en même temps. Par conséquent, nous allons introduire un multi-processus. À propos, dans le système d'exploitation avant Linux 2.4, le processus est l'entité de la tâche, et le système d'exploitation a été conçu pour être orienté processus.

import socket
import time
from concurrent.futures import ProcessPoolExecutor


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def multi_process_way():
    with ProcessPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(blocking_way) for i in range(10)}
    return len([future.result() for future in futures])


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        multi_process_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Résultat de l'exécution:

elapsed_time: 0.49[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.48[sec]
elapsed_time: 0.49[sec]
elapsed_time: 0.54[sec]
elapsed_time: 0.51[sec]
elapsed_time: 0.56[sec]
elapsed_time: 0.52[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.47[sec]
mean_elapsed_time: 0.50[sec]

Le temps moyen pour 10 fois est de 0,50 seconde. C'était efficace. Cependant, le problème est qu'il ne s'agit pas d'un dixième de la méthode de blocage synchrone. La raison en est que le processeur de l'environnement d'exécution n'est pas de 10 cœurs et qu'il est nécessaire de changer de processus.

Les processus de commutation ne sont pas aussi bon marché que le changement de contexte du processeur indiqué dans [CPU Time Sense](# 2-1-cpu Time Sense). Lorsque le processeur passe d'un processus à un autre, il enregistre d'abord tous les états de registre et de mémoire de l'exécution du processus d'origine, puis restaure l'état enregistré de l'autre processus. Pour le CPU, c'est comme attendre quelques heures. Lorsque le nombre de processus est supérieur au nombre de cœurs de processeur, il est nécessaire de changer de processus.

Outre la commutation, le multi-processus présente un autre inconvénient. Comme un serveur normal fonctionne dans un état stable, le nombre de processus pouvant être traités simultanément est limité à des dizaines à des centaines. Trop de processus peuvent rendre votre système instable et manquer de ressources mémoire.

Et en plus de la commutation et à petite échelle, le multi-processus présente des problèmes tels que le partage d'état et de données.

3-3. Poursuite de l'amélioration de la méthode de blocage synchrone: méthode multi-thread

La structure de données des threads est plus légère que celle des processus et vous pouvez avoir plusieurs threads dans un processus. Le système d'exploitation plus récent que Linux 2.4 a également changé l'unité minimum planifiable d'un processus à l'autre. Le processus existe simplement en tant que conteneur pour les threads et joue désormais le rôle de gestion des ressources. Les threads au niveau du système d'exploitation sont distribués à chaque cœur de la CPU et peuvent être exécutés simultanément.

import socket
import time
from concurrent.futures import ThreadPoolExecutor


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def multi_thread_way():
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(blocking_way) for i in range(10)}
    return len([future.result() for future in futures])


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        multi_thread_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Résultat de l'exécution:

elapsed_time: 0.31[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.27[sec]
mean_elapsed_time: 0.30[sec]

Le temps moyen pour 10 fois est de 0,30 seconde. Comme prévu, c'était plus rapide que le multi-processus. Le multithreading semble résoudre le problème de la lenteur de la commutation de processus multiprocessus, et l'échelle des tâches pouvant être traitées simultanément est passée de centaines à des milliers de multiprocessus.

Cependant, il existe également un problème avec le multithreading. Premièrement, le multithreading de Python est ["GIL"](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90% E3% 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% Avec la présence de 83% 83% E3% 82% AF), il y a un problème en ce que l'avantage du processeur multicœur ne peut pas être utilisé. Un seul thread est autorisé à être actif à un moment donné dans un processus Python. Alors, pourquoi le multithreading était-il plus rapide que le multiprocessus?

La raison en est que lors de l'appel d'un appel système bloquant comme sock.connect (), sock.recv (), le thread actuel libère le GIL, donnant aux autres threads une chance de s'exécuter. Cependant, si vous êtes dans un seul thread, l'appel système de blocage sera bloqué tel quel.

Connaissance des haricots: «Time.sleep» de Python est un processus de blocage, mais dans la programmation multithread, «time.sleep ()» ne bloque pas les autres threads.

Outre GIL, il existe des problèmes courants avec le multithreading. Les threads sont planifiés sur le système d'exploitation et leur "stratégie de planification" est ["préemption"](https://ja.wikipedia.org/wiki/%E3%83%97%E3%83%AA%E3 % 82% A8% E3% 83% B3% E3% 83% 97% E3% 82% B7% E3% 83% A7% E3% 83% B3), les threads de même priorité s'exécutent avec des chances égales Est garanti d'être. La préemption est une stratégie du premier arrivé, premier servi, il est donc imprévisible quel thread sera exécuté et quel code sera exécuté à la prochaine fois, et ["état de la concurrence"](https://ja.wikipedia.org/ Il peut s'agir de wiki /% E7% AB% B6% E5% 90% 88% E7% 8A% B6% E6% 85% 8B).

Par exemple, lorsqu'un thread de travail de scraping interroge l'URL suivante à extraire de la file d'attente des tâches, la question est de savoir laquelle passer si plusieurs threads viennent interroger en même temps. Par conséquent, des verrous et des files d'attente de synchronisation sont nécessaires pour empêcher la même tâche d'être exécutée plusieurs fois.

En outre, le multithreading peut gérer des centaines à des milliers de tâches multitâches simultanément, mais il est toujours insuffisant pour les systèmes Web importants et fréquents. Bien sûr, le plus gros problème avec le multithreading reste le conflit.

3-4. Méthode non bloquante asynchrone Enfin, nous avons atteint la méthode non bloquante. Voyons d'abord comment fonctionne le non-bloquant le plus primitif.

import socket
import time


def nonblocking_way():
    sock = socket.socket()
    sock.setblocking(False)
    #Parce que socket envoie une erreur lors de l'envoi d'une demande de connexion non bloquante
    try:
        sock.connect(('example.com', 80))
    except BlockingIOError:
        pass
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    data = request.encode('ascii')
    #Répéter l'envoi car le socket ne peut pas prédire quand une connexion sera établie
    while 1:
        try:
            sock.send(data)
            break
        except OSError:
            pass

    response = b''
    #Répéter la réception car elle est imprévisible lorsque la réponse peut être lue
    while 1:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                # blocking
                chunk = sock.recv(4096)
            break
        except OSError:
            pass
    return response


def sync_way():
    res = []
    for i in range(10):
        res.append(nonblocking_way())
    return len(res)


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        sync_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Résultat de l'exécution:

elapsed_time: 2.71[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.73[sec]
elapsed_time: 2.69[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.72[sec]
elapsed_time: 2.51[sec]
elapsed_time: 2.65[sec]
elapsed_time: 2.75[sec]
elapsed_time: 3.50[sec]
mean_elapsed_time: 2.79[sec]

Le temps moyen pour 10 fois est de 2,79 secondes. Je me sens trompé. Le temps de calcul est du même niveau que le blocage synchrone, mais le code est plus compliqué. Vous pensez peut-être que vous n'avez pas besoin de non-blocage.

Premièrement, dans le code ci-dessus, sock.setblocking (False) a demandé au système d'exploitation de changer l'appel système bloquant pour socket en non bloquant. Comme mentionné précédemment, le non-blocage, c'est que lorsque vous faites une chose, cela n'empêche pas le programme qui vous a appelé de faire l'autre. Le code ci-dessus ne se bloque certainement plus après avoir exécuté sock.connect () et sock.recv ().

Et le code est compliqué car il ne bloque plus. Lorsque connect () est appelé, le système d'exploitation déclenche d'abord une erreur, vous devez donc l'attraper avec try ici. Et ici, sans blocage, on passe immédiatement au code ci-dessous.

Je répète l'instruction while et exécute send () carconnect ()devient non bloquant et je ne sais pas quand la connexion sera établie, je dois donc continuer à essayer. Et même si send () est exécuté, comme on ne sait pas quand la réponse viendra, l'appel à recv () est également exécuté à plusieurs reprises.

connect () et recv () ne bloquent plus le programme principal, mais le temps CPU disponible n'est pas utilisé efficacement. J'ai passé ce temps à répéter la lecture et l'écriture de «socket» et à gérer les erreurs dans la boucle «while».

Et comme 10 téléchargements sont toujours exécutés dans l'ordre, le temps de calcul total est le même que celui de la méthode de blocage synchrone.

3-5. Amélioration de la méthode non bloquante asynchrone

3-5-1. epoll Si le côté OS vérifie si chaque appel non bloquant est prêt, le côté application n'a pas à attendre ou à juger en boucle, donc en allouant le temps libre à d'autres traitements Vous pouvez améliorer l'efficacité.

Par conséquent, le système d'exploitation a encapsulé le changement d'état des E / S comme un événement. Par exemple, les événements lisibles, les événements inscriptibles, etc. Il a également fourni un module système permettant à l'application de recevoir des notifications d'événements. Ce module est «select». Via select, l'application [" File Descriptor "](https://ja.wikipedia.org/wiki/%E3%83%95%E3%82%A1%E3%82%A4%E3%83%" AB% E8% A8% 98% E8% BF% B0% E5% AD% 90) et les fonctions de rappel peuvent être enregistrées. Lorsque l'état du descripteur de fichier change, select appelle la fonction de rappel préenregistrée. Cette méthode est appelée multiplexage d'E / S.

«select» a été amélioré plus tard en tant que «poll» en raison de l'algorithme inefficace. De plus, le noyau BSD a été mis à niveau vers le module kqueue et le noyau Linux a été mis à niveau vers le module ʻepoll. La fonctionnalité de ces quatre modules est la même et les API disponibles sont presque les mêmes. La différence est que kqueue et ʻepoll sont plus efficaces que les deux autres modules lors du traitement d'un grand nombre de descripteurs de fichiers.

Vous entendez souvent le module ʻepollà cause de l'utilisation répandue des serveurs Linux. En supposant que le nombre de descripteurs de fichier est $ N $,select ・ poll peut être traité avec le montant de calcul du temps de $ O (N) $, tandis que ʻepoll peut être traité avec $ O (1) $. De plus, ʻepoll` monte tous les descripteurs de fichier d'écoute avec un descripteur de fichier spécial. Ce descripteur de fichier peut être partagé par plusieurs threads.

3-5-2. Rappel

J'ai laissé l'écoute des événements d'E / S au système d'exploitation. Lorsque l'état des E / S change (par exemple, lorsqu'une connexion socket est établie et que les données peuvent être envoyées), que doit faire le système d'exploitation ensuite? C'est un ** rappel **.

Ici, nous devons encapsuler l'envoi et la lecture des données dans des fonctions séparées. Lorsque ʻepoll écoute l'état socketau nom de l'application, appelez la fonction d'envoi de requête HTTP lorsque l'étatsocket est prêt à être écrit (la connexion est établie) dans ʻepoll. Lorsque l'état de "socket" devient lisible (le client reçoit la réponse), appelez la fonction de traitement de la réponse. "

Utilisez ʻepoll` et les rappels pour refactoriser le code de scraping.

import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ


class Crawler:
    def __init__(self, path):
        self.path = path
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('example.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

    def connected(self, key, mask):
        selector.unregister(key.fd)
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        self.sock.send(get.encode('ascii'))
        selector.register(key.fd, EVENT_READ, self.read_response)

    def read_response(self, key, mask):
        global stopped
        chunk = self.sock.recv(4096)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            paths_todo.remove(self.path)
            if not paths_todo:
                stopped = True

La différence est que vous téléchargez 10 pages différentes au lieu de la même page. L'ensemble des chemins de page pour télécharger path_todo sera défini ultérieurement. Jetons un coup d'œil aux améliorations ici.

Tout d'abord, les deux boucles «send ()» et «recv ()» ont disparu.

Ensuite, nous avons introduit le module selectors et créé une instanceDefaultSelector. La bibliothèque standard Python selectors est une encapsulation de select / poll / epoll / kqueue. DefaultSelector sélectionnera le meilleur module en fonction du système d'exploitation. Toutes les versions de Linux 2.5.44 et supérieures sont ʻepoll`.

J'ai également enregistré une fonction de rappel à gérer après l'événement inscriptible ʻEVENT_WRITE et l'événement lisible ʻEVENT_READ de socket se sont produits.

La structure du code a été nettoyée et les notifications de blocage sont laissées au système d'exploitation. Cependant, pour télécharger 10 pages différentes, vous avez besoin de 10 instances Crawler et 20 événements se produiront. Comment obtiendriez-vous l'événement qui vient de se produire à partir de selector et exécuteriez le rappel correspondant?

3-5-3. Boucle d'événements

La seule façon de résoudre le problème ci-dessus est d'adopter l'ancienne méthode. En d'autres termes, c'est une boucle. Accédez au module selector et attendez qu'il vous indique quel événement s'est produit et quel rappel appeler. Cette boucle de notification d'événement en attente s'appelle une boucle d'événement (https://ja.wikipedia.org/wiki/%E3%82%A4%E3%83%99%E3%83%B3%E3%83] % 88% E3% 83% AB% E3% 83% BC% E3% 83% 97 #: ~: texte =% E3% 82% A4% E3% 83% 99% E3% 83% B3% E3% 83% 88 % E3% 83% AB% E3% 83% BC% E3% 83% 97% 20 (événement% 20loop)% E3% 80% 81,% E3% 80% 81% E3% 81% 9D% E3% 82% 8C % E3% 82% 89% E3% 82% 92% E3% 83% 87% E3% 82% A3% E3% 82% B9% E3% 83% 91% E3% 83% 83% E3% 83% 81% EF % BC% 88% E9% 85% 8D% E9% 80% 81% EF% BC% 89).

def loop():
    while not stopped:
        #Bloquer jusqu'à ce qu'un événement se produise
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)

Dans le code ci-dessus, une variable globale appelée «arrêté» contrôle le moment où la boucle d'événements s'arrête. Lorsque tous les path_todos sont consommés, remplacezarrêt par Vrai.

Et selector.select () est un appel bloquant. Si l'événement ne se produit pas ici, l'application n'a rien à gérer et doit être bloquée jusqu'à ce que l'événement se produise. Comme vous pouvez vous y attendre, lors du téléchargement d'une seule page Web, vous pourrez faire send () et recv () après connect (), donc l'efficacité du traitement est la même que la méthode de blocage synchrone. .. La raison est que même si vous ne bloquez pas avec connect () ou recv (), cela bloquera avec select ().

Ainsi, la fonction selector (ci-après appelée ʻepoll / kqueue) est conçue pour résoudre les accès parallèles à grande échelle. La fonction selector` est à son meilleur lorsqu'il y a un grand nombre d'appels non bloquants dans le système et que les événements peuvent être générés presque aléatoirement.

Le code ci-dessous a créé 10 tâches de téléchargement et lancé une boucle d'événements.

if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        selector = DefaultSelector()
        stopped = False
        paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
        start = time.time()
        for path in paths_todo:
            crawler = Crawler(path)
            crawler.fetch()
        loop()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Résultat de l'exécution:

elapsed_time: 0.29[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.24[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.34[sec]
mean_elapsed_time: 0.29[sec]

Le temps moyen pour 10 fois est de 0,29 seconde. vous êtes fort. Résolution du problème du téléchargement simultané de 10 pages avec "boucle d'événement + rappel" dans un seul thread. Il s'agit de programmation asynchrone. Il y a une boucle for, on dirait qu'elle crée des instances Crawler en séquence et appelle la méthode fetch, mais la méthode fetch ne gère que connect () et l'enregistrement des événements. est. Et, en termes de temps d'exécution, les tâches de téléchargement ont clairement été effectuées en même temps.

Procédure de traitement asynchrone du code ci-dessus:

  1. Créez une instance Crawler
  2. Appelez la méthode fetch pour créer une connexion socket et enregistrer un événement inscriptible dans le selector
  3. fetch n'a pas de processus de blocage, alors revenez immédiatement
  4. Répétez les étapes ci-dessus 10 fois pour ajouter les 10 tâches de téléchargement à la boucle d'événements
  5. Appelez une boucle d'événements, entrez la première boucle et bloquez pour écouter les événements
  6. Lorsqu'une tâche de téléchargement ʻEVENT_WRITE se produit, rappelez sa méthode connected` et quittez la première boucle.
  7. Entrez dans la deuxième boucle ronde et lorsqu'un événement se produit sur une tâche de téléchargement, exécutez sa fonction de rappel; à ce stade, il devient imprévisible quel événement se produira et dans le précédent connecté ʻEVENT_READ peut se produire, et ʻEVENT_WRITE d'autres tâches peuvent se produire (à ce moment, le temps bloqué par une tâche est utilisé pour exécuter d'autres tâches).
  8. La boucle continue jusqu'à ce que toutes les tâches de téléchargement aient été effectuées
  9. Quittez la boucle et quittez tout le programme

3-5-4. Résumé

Nous avons vu de la méthode de blocage synchrone à la méthode non bloquante asynchrone. Et maintenant, vous pouvez faire de la magie noire pour traiter plusieurs tâches de blocage d'E / S réseau en parallèle dans un seul thread. Comparé au multi-threading, il ne change même pas de thread. L'exécution de rappel est un appel de fonction qui se termine dans la pile du thread. De plus, les performances sont également excellentes. Le nombre de tâches pouvant être traitées simultanément sur un seul serveur est passé de dizaines de milliers à des centaines de milliers.

C'est la fin de la prise en charge de la programmation asynchrone pour certains langages de programmation. Les ingénieurs doivent passer beaucoup de temps à utiliser directement ʻepoll` pour enregistrer les événements et les rappels, gérer les boucles d'événements et concevoir les rappels.

Comme vous pouvez le voir dans le contenu jusqu'à présent, quel que soit le langage que vous utilisez, si vous effectuez une programmation asynchrone, vous ne pouvez pas échapper au modèle "boucle d'événement + rappel" ci-dessus. Cependant, vous n'utilisez peut-être pas ʻepoll, et ce n'est peut-être pas une boucle While`. Cependant, toutes sont des méthodes asynchrones du modèle "Je vous apprendrai plus tard".

Alors, pourquoi certaines programmations asynchrones ne voient-elles pas des modèles de rappel? Nous examinerons cela à l’avenir. Vous n'avez pas parlé de Corroutine, l'exécutif de la programmation asynchrone de Python.

4. Moyens d'améliorer les E / S asynchrones de Python

De là, je vais expliquer comment l'écosystème de programmation asynchrone de Python a hérité du modèle "boucle d'événement + rappel" mentionné ci-dessus, puis voir comment il a évolué pour devenir le modèle de collout natif ʻasyncio`. Faisons le.

4-1. L'enfer des rappels

Dans [3. Asynchronous I / O Evolution Path](# 3-Asynchronous io Evolution Path), nous avons vu la structure de base de "boucle d'événement + rappel" qui réalise une programmation asynchrone avec un seul thread. Certes, "boucle d'événement + rappel" peut grandement améliorer l'efficacité du programme. Cependant, le problème n'est pas encore résolu. Le projet réel peut être très complexe, vous devez donc prendre en compte les problèmes suivants:

Dans un vrai projet, ces problèmes sont inévitables. Et derrière le problème se trouvent les lacunes du modèle de rappel.

def callback_1():
    #En traitement
    def callback_2():
        #En traitement
        def callback_3():
            #En traitement
            def callback_4():
                #En traitement
                def callback_5():
                    #En traitement
                async_function(callback_5)
            async_function(callback_4)
        async_function(callback_3)
    async_function(callback_2)
async_function(callback_1)

(Veuillez pardonner aux abonnés Lisp)

Lors de l'écriture d'un code de programmation synchrone, le traitement correspondant est normalement effectué de haut en bas.

do_a()
do_b()

Si do_b () dépend du résultat dedo_a (), et do_a () ʻest un appel asynchrone, nous ne savons pas quand le résultat de do_a ()sera retourné. Le traitement suivant doit être passé àdo_a ()sous la forme d'un callback, ce qui garantit quedo_a ()sera terminé avant l'exécution dedo_b ()`.

do_a(do_b())

Et si tous les longs flux de traitement sont rendus asynchrones, il en sera ainsi.

do_a(do_b(do_c(do_d(do_e(do_f(...))))))

Le style ci-dessus est appelé "style Callback Hell". Cependant, le problème principal n'est pas l'apparence, mais la nécessité de changer la structure d'origine de haut en bas de l'extérieur vers l'intérieur. D'abord, do_a (), puis do_b (), puis do_c (), ..., et ensuite au plus profond do_f (). Dans le processus synchrone, do_b () après do_a () signifie que le pointeur d'instruction du thread contrôle le flux. Cependant, lorsqu'il s'agit de modèles de rappel, les contrôles de flux sont soigneusement placés par les ingénieurs.

L'objet sock dans [3-1 Synchronous Version](# 3-1-Synchronous Blocking Method) est réutilisé de haut en bas, tandis que [3-5 Callback Version](# 3-5-Amélioration de la méthode non bloquante asynchrone) vous oblige à instancier la classe Crawler et à stocker l'objet chaussette dans self. Sans un style de programmation orienté objet, vous auriez à passer l'état que vous devez partager à chaque fonction de rappel, comme une touche de bâton. Et vous devez planifier à l'avance et concevoir soigneusement les états à partager entre plusieurs appels asynchrones.

Une série de callbacks constitue une chaîne d'appels (“Method Chain”). Par exemple, il y a une chaîne de do_a () à do_f (). Que pensez-vous si do_d () provoque une erreur? La chaîne est cassée et vous perdez l'état de toucher le bâton. Et ["Stack Trace"](https://ja.wikipedia.org/wiki/%E3%82%B9%E3%82%BF%E3%83%83%E3%82%AF%E3%83% 88% E3% 83% AC% E3% 83% BC% E3% 82% B9) sera détruit. Par exemple, do_d () provoque une erreur, et l'appel à do_d () échoue dansdo_c (), doncdo_c () ʻ lui-même provoque également une erreur. De même,do_b ()etdo_a () provoqueront une erreur, et le journal des erreurs signalera seulement que "l'appel à do_a ()` a provoqué une erreur ". Cependant, c'est «do_d ()» qui a causé l'erreur. Pour éviter cela, vous devez détecter toutes les erreurs et renvoyer les données en tant que valeur de retour de la fonction. Et toutes les fonctions de rappel doivent vérifier la valeur de retour de la fonction précédente, ce qui empêche "Hiding Error".

Ainsi, alors que la lisibilité n'est qu'une question d'apparence, deux inconvénients, tels que la rupture des traces de pile et la difficulté à partager et à gérer l'état, contribuent à l'extraordinaire difficulté de la programmation asynchrone basée sur le rappel. Chaque langage de programmation essaie de résoudre ce problème. Merci à cela, "Promise", "Coroutine" Une solution telle que / wiki /% E3% 82% B3% E3% 83% AB% E3% 83% BC% E3% 83% 81% E3% 83% B3) est née.

4-2. Défis

Le modèle "boucle d'événement + rappel" a résolu les difficultés de la programmation asynchrone telles que "lorsque la tâche asynchrone est terminée" et "comment gérer la valeur de retour de l'appel asynchrone". Cependant, les rappels compliquent le programme. Avant que nous puissions penser aux moyens d'éviter cette lacune, nous devons d'abord clarifier les éléments essentiels. Pourquoi un rappel est-il obligatoire? Et quelle est la raison du partage et de la gestion de l'État, qui est l'un des inconvénients en premier lieu?

Le partage et la gestion de l'État sont nécessaires parce que le programme doit savoir ce qu'il a fait, ce qu'il fait et ce qu'il va faire. En d'autres termes, le programme a besoin de connaître son état actuel, et il doit également toucher à chaque rappel et le maintenir en place.

La gestion de l'état entre plusieurs rappels est difficile. Alors pourquoi ne pas laisser chaque callback gérer uniquement son propre état? La chaîne d'appels rend la gestion des erreurs difficile. Alors pourquoi ne pas utiliser une chaîne d'appels? Mais si vous n'utilisez pas de chaîne d'appels, comment la fonction appelée sait-elle si la fonction précédente est terminée? Alors qu'en est-il de laisser un rappel notifier le prochain rappel? En premier lieu, un rappel peut être considéré comme une tâche en attente.

Notification mutuelle entre les tâches, donnant à chaque tâche son propre état, c'est exactement l'ancienne pratique de programmation "Cooperative Multitasking". Mais vous devez le planifier dans un seul thread. De là, vous pouvez avoir un cadre de pile et connaître facilement votre statut ["Coroutine"](https://ja.wikipedia.org/wiki/%E3%82%B3%E3%83%AB% E3% 83% BC% E3% 83% 81% E3% 83% B3 #: ~: texte =% E3% 82% B3% E3% 83% AB% E3% 83% BC% E3% 83% 81% E3% 83% B3% EF% BC% 88% E8% 8B% B1% 3A% 20co% 2Droutine,% E5% 8B% 95% E4% BD% 9C% E3% 82% 92% E8% A1% 8C% E3% 81 % 86% E3% 81% 93% E3% 81% A8% E3% 81% AB% E3% 82% 88% E3% 82% 8B% E3% 80% 82) entre en jeu. Bien sûr, il est également possible de se notifier entre Koruchin.

4-3. Corroutine

Corroutine est ["Substitute"](https://ja.wikipedia.org/wiki/%E3%82%B5%E3%83%96%E3%83%AB%E3%83%BC%E3%83%81 C'est une généralisation de% E3% 83% B3). La stratégie de planification Coroutine est ["Non préemptive"](http://e-words.jp/w/%E3%83%8E%E3%83%B3%E3%83%97%E3%83%AA%E3 % 82% A8% E3% 83% B3% E3% 83% 97% E3% 83% 86% E3% 82% A3% E3% 83% 96% E3% 83% 9E% E3% 83% AB% E3% 83 % 81% E3% 82% BF% E3% 82% B9% E3% 82% AF.html #: ~: texte =% E3% 83% 8E% E3% 83% B3% E3% 83% 97% E3% 83 % AA% E3% 82% A8% E3% 83% B3% E3% 83% 97% E3% 83% 86% E3% 82% A3% E3% 83% 96% E3% 83% 9E% E3% 83% AB % E3% 83% 81% E3% 82% BF% E3% 82% B9% E3% 82% AF% E3% 81% A8% E3% 81% AF% E3% 80% 81% E4% B8% 80% E3 % 81% A4% E3% 81% AE% E5% 87% A6% E7% 90% 86% E8% A3% 85% E7% BD% AE, CPU% E3% 82% 92% E7% AE% A1% E7 % 90% 86% E3% 81% 97% E3% 81% AA% E3% 81% 84% E6% 96% B9% E5% BC% 8F% E3% 80% 82), avec entrées multiples Vous pouvez contrôler la suspension et la reprise.

Un sous-programme est un bloc de code appelable défini par un langage de programmation. En d'autres termes, c'est un ensemble d'instructions emballées pour réaliser une fonction. Dans les langages de programmation généraux, les sous-programmes sont réalisés par des structures telles que des fonctions et des méthodes.

4-4. Collout basé sur un générateur

Objet spécial en Python ["Generator"](https://ja.wikipedia.org/wiki/%E3%82%B8%E3%82%A7%E3%83%8D%E3%83%AC% E3% 83% BC% E3% 82% BF_ (% E3% 83% 97% E3% 83% AD% E3% 82% B0% E3% 83% A9% E3% 83% 9F% E3% 83% B3% E3 Il y a% 82% B0)). Les caractéristiques du générateur sont similaires à celles du coroutum, il peut être interrompu lors d'une itération et ne perd pas son état précédent jusqu'à l'itération suivante.

Le générateur a été amélioré en Python 2.5 (PEP 342) pour permettre des collouts simples dans le générateur. Le titre proposé pour cette amélioration est «Coroutines via des générateurs améliorés». Grâce à PEP 342, le générateur peut maintenant suspendre l'exécution à «yield» et renvoyer des données. Vous pouvez également utiliser send pour envoyer des données au générateur, et throw pour provoquer une erreur dans le générateur et le terminer.

Refactorisez ensuite le programme de grattage avec un collout basé sur un générateur.

4-4-1. Objet futur

Comment connaissez-vous le résultat d'un appel asynchrone si vous quittez la méthode de rappel? Ici, nous définissons d'abord l'objet. Lorsque le résultat de l'appel asynchrone est renvoyé, enregistrez-le. Cet objet est appelé l'objet «Future».

import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ


class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

L'objet Future a une variable d'instance result et stocke les futurs résultats de l'exécution. Et la méthode set_result définit le result, et après avoir lié la valeur au result, elle exécute la fonction de rappel pré-ajoutée à l'objet Future. Les fonctions de rappel peuvent être ajoutées avec la méthode ʻadd_done_callback () `.

N'avez-vous pas dit que vous quittiez la méthode de rappel? Ne paniquez pas. Comme expliqué dans [Ici](# 3-5-4-Summary), si vous faites de la programmation asynchrone, vous ne pouvez pas échapper au modèle "boucle d'événement + rappel". Et le rappel ici est un peu différent du précédent.

4-4-2. Refactoring Crawler

Quoi qu'il en soit, j'ai créé un objet Future qui représente les données futures. Refactorisons le code de scraping avec l'objet Future.

class Crawler:
    def __init__(self, path):
        self.path = path
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('example.com', 80))
        except BlockingIOError:
            pass
        f = Future()

        def on_connected():
            f.set_result(None)

        # fileno()La méthode renvoie le descripteur de fichier de socket sous la forme d'un type entier court
        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield f
        selector.unregister(sock.fileno())
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        sock.send(get.encode('ascii'))

        global stopped
        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(), EVENT_READ, on_readable)
            #Recevoir le résultat envoyé
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                paths_todo.remove(self.path)
                if not paths_todo:
                    stopped = True
                break

Par rapport à la version de rappel précédente, cela fait beaucoup de différence. J'ai utilisé l'expression yield pour la méthode fetch et en ai fait un générateur. Le générateur peut être démarré une fois avec next () ou avec send (None), et sera interrompu quand il atteindra yield. Alors, comment le générateur «fetch» redémarre-t-il?

4-4-3. Objet de tâche

Vous devez suivre une règle pour résoudre le problème ci-dessus. C'est le "principe de responsabilité unique" (https://en.wikipedia.org/wiki/Single-responsibility_principle). Par conséquent, nous allons créer ici quelque chose qui jouera le rôle de redémarrer le générateur et de gérer son état. Nommez-le «Tâche».

class Task:
    def __init__(self, coro):
        #Objet Corroutine
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            #Une fois envoyé, le générateur fonctionnera jusqu'au prochain rendement
            # next_future est l'objet retourné par yield
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)

L'objet Task dans le code ci-dessus contient un objet coroutine coro. La tâche à gérer est une tâche en attente (coroutine), donc le coro ici est le générateur fetch. Et il y a une méthode step, qui est exécutée une fois à l'initialisation. La méthode step appelle la méthode send () du générateur, et à l'initialisation elle devientsend (None), donc l'itération initiale de coro, c'est-à-direfetch (), est effectuée.

Une fois que send () est terminé, vous obtiendrez le prochain futur et vous utiliserez ʻadd_done_callback pour ajouter une fonction de rappel appelée step () au prochain futur`.

Ensuite, jetons un œil au générateur fetch (). La logique métier telle que l'envoi d'une demande et la réception d'une réponse est effectuée en interne. Et la fonction de rappel enregistrée dans selector est également devenue simple. Les deux «yield» retournent les «futurs» correspondants et les reçoivent dans «Task.step ()». Vous avez maintenant connecté avec succès Task, Future et Coroutine.

Initialise l'objet Task et fetch () s'exécute jusqu'au premier yield. Alors, comment le récupérer?

4-4-4. Boucle d'événements

La boucle d'événements est de retour. Lorsque vous atteignez le premier rendement, attendez que le ʻEVENT_WRITE` enregistré se produise. La boucle d'événement, comme un battement de cœur, continuera à se déplacer une fois qu'elle commence à pulser.

def loop():
    while not stopped:
        #Bloquer jusqu'à ce qu'un événement se produise
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        selector = DefaultSelector()
        stopped = False
        paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
        start = time.time()
        for path in paths_todo:
            crawler = Crawler(path)
            Task(crawler.fetch())
        loop()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Résultat de l'exécution:

elapsed_time: 0.30[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.26[sec]
mean_elapsed_time: 0.27[sec]

Le temps moyen pour 10 fois est de 0,27 seconde. Cette fois, la «boucle» est un peu différente d'avant. callback () ne reçoit plus ʻevent_key et event_mask. En d'autres termes, le rappel ici n'a pas besoin de savoir qui a déclenché l'événement, il peut être vu en combinaison avec fetch (), mais le rappel définit simplement la valeur à future avec set_result ()`. Il vous suffit de l'installer. Et vous n'avez pas besoin de savoir de quel "futur" il s'agit, le collout peut sauvegarder son propre état, et ce n'est pas grave si vous connaissez votre "futur". Vous n'avez pas à vous soucier de définir une valeur, le collout gérera tout pour vous.

4-4-5. Résumé de "Generator Collout Style VS Callback Style"

Style de rappel:

Style de collout du générateur:

4-4-6. Refactoriser davantage le code

Le code est un peu difficile à lire. Ici, que dois-je faire si on me demande d'améliorer la résistance aux défauts et la fonctionnalité de «fetch»? De plus, la logique technique (liée à «socket») et la logique métier (traitement des demandes et des réponses) sont mélangées, ce qui n'est pas bon.

Cependant, il y a des yields ici, et si vous voulez les résumer, vous devez en faire des générateurs. De plus, fetch () lui-même est un générateur, et jouer avec le générateur à l'intérieur du générateur peut rendre le code encore plus compliqué.

Les concepteurs de Python ont également remarqué ce problème et ont fourni un jouet pour jouer avec le générateur dans un générateur appelé yield from.

4-5. Collout basé sur un générateur amélioré avec rendement

4-5-1. Rendement de l'introduction de la grammaire

yield from est une syntaxe introduite à partir de Python 3.3 (PEP 380). Le PEP 380 est principalement destiné à éliminer l'inconvénient de jouer avec le générateur à l'intérieur du générateur et a deux fonctions.

L'un d'eux est que vous n'avez pas à «céder» le sous-générateur en tournant l'itération, vous pouvez directement «céder» à partir de «. Les deux types de générateurs suivants sont équivalents en fonction.

def gen_1():
    sub_gen = range(10)
    yield from sub_gen


def gen_2():
    subgen = range(10)
    for item in subgen:
        yield item

L'autre est la possibilité d'ouvrir des canaux d'intercommunication entre le sous-générateur et le générateur principal.

def gen():
    yield from sub_gen()


def sub_gen():
    while 1:
        x = yield
        yield x + 1


def main():
    g = gen()
    next(g)  #Courez jusqu'au premier rendement
    retval = g.send(1)  # gen()Semble envoyer des données à, mais en fait sous_gen()Envoi à
    print(retval)  # sub_gen()Sort le 2 calculé à partir de
    g.throw(StopIteration)  # sub_gen()Causer une erreur à

Le code ci-dessus montre la fonction de communication mutuelle de «rendement de». yield from ouvre un canal de communication dansgen ()entresub_gen ()etmain (). Les données «1» peuvent être envoyées directement de «main ()» à «sub_gen ()», et la valeur calculée «2» peut être renvoyée directement de «sub_gen ()» à «main ()». .. Vous pouvez également terminer sub_gen () en envoyant une erreur directement de main () à sub_gen ().

Au fait, «yield from» peut être non seulement «yield from » mais aussi «yield from ».

4-5-2. Refactorisation

Connexion abstraite socket

def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())

Chargement de la réponse abstraite

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f
    selector.unregister(sock.fileno())
    return chunk


def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b''.join(response)

Refactorisation de «Crawler»

class Crawler:
    def __init__(self, path):
        self.path = path
        self.response = b''

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ('example.com', 80))
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        paths_todo.remove(self.path)
        if not paths_todo:
            stopped = True

Le code jusqu'à ce point est bien. La partie réutilisable est abstraite en tant que fonction. La valeur du sous-générateur peut également être obtenue avec «rendement de». Cependant, une chose à noter est que nous utilisons «yield from» au lieu de «yield» lors du retour d'un objet «futur». yield peut être appliqué aux objets Python ordinaires, mais pas yield from. Maintenant, nous devons modifier le Future pour en faire un objet ʻiterable`.

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        yield self
        return self.result

Je viens d'ajouter «iter». Bien sûr, vous ne devez pas nécessairement utiliser yield from, mais vous pouvez le laisser comme yield. Cependant, il est préférable de les utiliser correctement pour distinguer s'ils ont des collouts basés sur des générateurs ou simplement des générateurs. Par conséquent, après l'introduction de yield from de Python 3.3, il était déconseillé de créer des collouts avec yield. Il y a aussi l'avantage de pouvoir envoyer librement des données entre des collouts en utilisant la fonction de communication mutuelle de «rendement de».

4-5-3. Résumé de l'amélioration du collout en fonction du rendement

En améliorant le collout par "yield from", nous avons pu augmenter le niveau d'abstraction du code et simplifier davantage les questions liées à la logique métier. La fonction d'intercommunication facilite l'échange de données entre les coroutines. Avec cela, la programmation asynchrone Python a fait de grands progrès.

Et les développeurs du langage Python ont également profité pleinement de yield from. Le framework de programmation asynchrone Python Tulip, dirigé par Guido, a également évolué à une vitesse énorme, changeant son nom en ʻasyncio` dans Python 3.4 et l'adoptant temporairement comme bibliothèque standard (à titre provisoire).

4-6. asyncio

4-6-1. Introduction d'Asyncio

ʻAsyncio est un framework d'E / S asynchrones ([PEP 3156](https://www.python.org/dev/peps/pep-3156/)) introduit expérimentalement à partir de Python 3.4. ʻAsyncio a été fourni en Python comme infrastructure pour la programmation d'E / S asynchrones par Cortine. Les composants de base se composent de Event Loop, Coroutine, Task, Future et d'autres modules auxiliaires.

Lorsque ʻasyncio a été introduit, un décorateur appelé @ asyncio.coroutinea également été fourni. Vous pouvez le marquer comme collout en l'attachant à une fonction qui utiliseyield from`, mais vous n'êtes pas obligé de l'utiliser.

Avec l'aide de yield from de Python 3.4, il est plus facile de créer des corouties, mais comme pour les problèmes historiques, les gens ont une distinction et une relation entre ** générateurs ** et ** coroutines **. Je ne comprends pas. Et je ne connais pas la différence entre "yield" et "yield from". Cette confusion viole les règles de "Python Zen".

Ainsi, à partir de Python 3.5, les concepteurs de Python ont ajouté à la hâte la syntaxe ʻasync / await ([PEP 492](https://www.python.org/dev/peps/pep-0492/)). , A montré un support explicite pour Corroutine. C'est ce qu'on appelle un ** collout natif **. Les deux styles de collout ʻasync / await et yield from ont la même implémentation interne et sont compatibles l'un avec l'autre.

Et à partir de Python 3.6, ʻasyncio` a officiellement rejoint la bibliothèque standard. Ce qui précède est la trajectoire évolutive des E / S asynchrones dans CPython.

4-6-2. Asyncio et collout natif

Découvrez la commodité de la syntaxe ʻasyncio et ʻasync / await.

import asyncio
import aiohttp
import time


loop = asyncio.get_event_loop()


async def fetch(path):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(path) as response:
            response = await response.read()
            return response


if __name__ == '__main__':
    host = 'http://example.com'
    paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        tasks = [fetch(host + path) for path in paths_todo]
        loop.run_until_complete(asyncio.gather(*tasks))
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

Résultat de l'exécution:

elapsed_time: 0.27[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.25[sec]
mean_elapsed_time: 0.26[sec]

Comparé au style collout basé sur un générateur, le style ʻasyncio` est assez différent.

La raison pour laquelle vous n'utilisez pas «socket» par vous-même pour envoyer des requêtes HTTP et recevoir des réponses est qu'il est extrêmement difficile de bien gérer le protocole HTTP dans les affaires réelles, et si vous avez un client HTTP asynchrone avec toutes les fonctionnalités, vous pouvez le faire vous-même. Parce que tu n'es pas obligé.

Par rapport à la version de blocage synchrone du code:

Résumé

Nous avons examiné de plus près l'évolution et la mécanique de la programmation asynchrone en Python. Au final, nous avons atteint N fois plus d'efficacité avec un code aussi simple qu'un traitement synchrone. Et il n'a pas les inconvénients de l'enfer du rappel.   Plus de détails sur la façon d'utiliser ʻasyncio, ses forces et faiblesses, et comment il le distingue des autres solutions d'E / S asynchrones au sein de l'écosystème Python, ʻasyncio, seront discutés dans un autre article.

référence

A Web Crawler With asyncio Coroutines Latency numbers every programmer should know

Recommended Posts

Compréhension complète de la programmation asynchrone Python
Compréhension complète de la programmation orientée objet de Python
Compréhension complète de la fonction numpy.pad
Comprendre le mémo de la programmation collective des connaissances
Une compréhension approximative de python-fire et un mémo
Un mémorandum sur l'utilisation de la fonction d'entrée de Python
Mémorandum sur le QueryDict de Django
[Python] Une compréhension approximative du module de journalisation
[Python] Une compréhension approximative des itérables, des itérateurs et des générateurs
[PyTorch] Un peu de compréhension de CrossEntropyLoss avec des formules mathématiques
Traitement asynchrone de Python ~ Comprenez parfaitement async et attendez ~
Programmation asynchrone avec libev # 2
[Note] Début de la programmation
Recrutement de maîtres de programmation
Simulation de comp gacha
Mémorandum elasticsearch_dsl
Programmation asynchrone avec libev
Programmation asynchrone avec libev # 3
Créez instantanément un diagramme de données 2D à l'aide de matplotlib de python
[Pour les débutants] Un résumé en mots des langages de programmation populaires (version 2018)