Les principaux systèmes d'exploitation modernes sont Mac OS, UNIX, Linux et Windows. Ces OS prennent en charge la fonction "multitâche".
Qu'est-ce que le multitâche? Vous pourriez penser, par exemple, dans une situation où vous lancez un navigateur, écoutez de la musique et rédigez un rapport dans Word, au moins trois tâches sont en cours en même temps. Et en plus des tâches à l'avant, diverses tâches liées au système d'exploitation s'exécutent secrètement dans les coulisses.
Il est facile de comprendre qu'un processeur multicœur peut gérer le multitâche, mais un processeur monocœur peut également gérer le multitâche. Le système d'exploitation exécute chaque tâche à tour de rôle. Par exemple, la tâche 1 est de 0,01 seconde, la tâche 2 de 0,01 seconde, la tâche 3 de 0,01 seconde, la tâche 1 de 0,01 seconde, et ainsi de suite. Le processeur est rapide, on a donc l'impression qu'il est presque simultané. Cette exécution alternative est souvent appelée ["calcul simultané"](https://ja.wikipedia.org/wiki/%E4%B8%A6%E8%A1%8C%E8%A8%88%E7 % AE% 97).
Bien entendu, les processeurs monocœur sont exécutés à leur tour, donc dans le vrai sens du terme, la progression simultanée n'est possible que pour les processeurs multicœurs. Le traitement simultané de plusieurs tâches sur chaque cœur au moment d'un processeur multicœur est "[calcul parallèle](https://ja.wikipedia.org/wiki/%E4%B8%A6%E5%] 88% 97% E8% A8% 88% E7% AE% 97) ". Dans la plupart des cas, le nombre de tâches exécutées dépasse de loin le nombre de cœurs, de sorte que le travail d '«exécution par équipes» est également effectué en multicœur.
Pour le système d'exploitation, une tâche est un processus. Par exemple, le lancement d'un navigateur crée un processus de navigateur unique. De même, lorsque vous ouvrez Word, un processus Word est créé.
Un processus n'est pas nécessairement un processus. Par exemple, Word effectue de nombreux traitements, tels que la surveillance des entrées utilisateur, la vérification orthographique et l'affichage de l'interface utilisateur. Ces «sous-tâches» sont appelées Threads. Il y a au moins un thread par processus. Lorsqu'il y a plusieurs threads, ils se relaient comme des processus.
Il existe deux façons principales de gérer le multitâche en Python en même temps.
Bien sûr, vous pouvez avoir plusieurs threads dans plusieurs processus, mais cela n'est pas recommandé en raison de la complexité du modèle.
Lors du traitement du multitâche, la communication et la coopération entre les tâches peuvent être nécessaires, la tâche 1 peut devoir être interrompue lorsque la tâche 2 est exécutée, et la tâche 3 et la tâche 4 peuvent ne pas pouvoir se poursuivre en même temps. , Le programme devient un peu compliqué.
(Source: Présentation de la conférence sur les logiciels système)
une fonction | La description |
---|---|
start() | Démarrer un fil |
setName() | Donnez un nom au fil |
getName() | Obtenez le nom du fil |
setDaemon(True) | FildémonÀ |
join() | Attendez que le thread termine le traitement |
run() | Exécuter manuellement le traitement des threads |
Les threads Python ne sont pas simulés par des processus, ce sont de vrais [threads POSIX](https://ja.wikipedia.org/wiki/POSIX%E3%82%B9%E3%83%AC%E3%83% 83% E3% 83% 89). Depuis la bibliothèque standard, vous pouvez utiliser deux modules, _thread </ code> et
threading </ code>. Et
_thread </ code> est un module de bas niveau, et
threading </ code> est un module qui l'encapsule. J'utilise donc généralement
threading </ code>.
Vous pouvez démarrer un thread en introduisant une fonction, etc., en créant une instance de Thread </ code> et en le démarrant avec
start </ code>.
import threading
import time
def run(n):
# threading.current_thread().le nom est getName()Appel
print("task: {} (thread name: {})".format(n, threading.current_thread().name))
time.sleep(1)
print('2s')
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",), name='Thread T2') #Ici setName()Est appelé
# start()
t1.start()
t2.start()
# join()
t1.join()
t2.join()
# join()Parce que j'ai appelé
#Le thread principal attend que le thread ci-dessus se termine
#Imprimez quand tout est fait
print(threading.current_thread().name)
Résultat de l'exécution:
task: t1 (thread name: Thread-1)
task: t2 (thread name: Thread T2)
2s
2s
1s
1s
0s
0s
MainThread
Vous pouvez voir que t1 et t2 sont exécutés en alternance. Une des règles d'alternance sera expliquée plus en détail dans GIL après l'opération IO (où l'opération print </ code> s'applique).
Il est également possible d'hériter de Thread </ code> et de personnaliser la méthode
run </ code> de la classe de thread.
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
# run()Récrire
def run(self):
print("task: {}".format(self.n))
time.sleep(1)
print('2s')
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
Résultat de l'exécution:
task: t1
task: t2
2s
2s
1s
1s
0s
0s
Vous pouvez compter le nombre de threads actifs avec active_count </ code>. Toutefois, dans un environnement REPL, il existe plusieurs threads à surveiller, de sorte que le nombre de threads sera plus élevé que prévu.
Exécutez le code suivant dans un script.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(1)
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
t.start()
time.sleep(0.5)
print(threading.active_count())
Résultat de l'exécution:
task: t0
task: t1
task: t2
4
Lorsque le thread principal print </ code> est exécuté, le nombre de threads = 3 + 1 (thread principal) car d'autres threads sont toujours en cours d'exécution.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(0.5)
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
t.start()
time.sleep(1)
print(threading.active_count())
Résultat de l'exécution:
task: t0
task: t1
task: t2
1
En ajustant le temps d'exécution et en retardant l'impression </ code> du thread principal, le nombre de threads actifs devient 1 uniquement pour le thread principal.
Démarrez le thread en tant que démon.
import threading
import time
def run(n):
print("task: {}".format(n))
time.sleep(1)
print('3')
time.sleep(1)
print('2')
time.sleep(1)
print('1')
for i in range(3):
t = threading.Thread(target=run, args=("t{}".format(i),))
# setDaemon(True)
t.setDaemon(True)
t.start()
time.sleep(1.5)
print('Le nombre de fils: {}'.format(threading.active_count()))
Résultat de l'exécution:
task: t0
task: t1
task: t2
3
3
3
Le nombre de fils: 4
Puisque t1, t2 et t3 sont définis comme thread démon du thread principal, ils s'arrêtent lorsque le thread principal se termine. Par exemple, la vérification orthographique de Word est un thread démon, qui s'exécute dans une boucle infinie, mais lorsque le thread principal tombe en panne, il s'arrête. 1-5. GIL Lors de l'utilisation d'un processeur multicœur dans d'autres langages de programmation, les threads avec le nombre de cœurs peuvent être exécutés en même temps. Cependant, en Python, un seul thread s'exécute à la fois, ce qui est un processus. En d'autres termes, le multithreading Python est complètement parallèle. La raison est [GIL (Global Interpreter Lock)](https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90%E3 % 83% AB% E3% 82% A4% E3% 83% B3% E3% 82% BF% E3% 83% 97% E3% 83% AA% E3% 82% BF% E3% 83% AD% E3% 83 Il est situé à% 83% E3% 82% AF).
GIL est un type de contrôle exclusif (expliqué plus loin). Lorsque Python a été conçu pour la première fois, nous avons implémenté GIL pour le rendre plus facile à combiner avec la sécurité des données et les bibliothèques de langage C. Lorsque vous exécutez un thread, vous devez obtenir un GIL. Il n'y a qu'un seul processus Python dans un interpréteur Python. Et comme il n'y a qu'un seul GIL dans un processus Python, un seul thread peut être exécuté à la fois. GIL est comme un passeport, et les threads qui n'ont pas de GIL ne peuvent pas entrer dans le CPU. Au fait, GIL est en CPython (distribution Python normale), mais pas en PyPy et Jython. Un autre langage bien connu avec GIL est Ruby.
sys.setcheckinterval </ code>
À titre expérimental, exécutons une simple boucle infinie.
import threading
import multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
Comme vous pouvez le voir, à cause de GIL, l'utilisation du processeur n'est que d'environ 100% en un seul processus, peu importe vos efforts (il devrait être disponible jusqu'à 400% sur un processeur quad core).
Les ressources sont partagées entre les threads dans le même processus. Et comme les threads sont commutés au hasard et dans le désordre, les données peuvent être perturbées.
import threading
#Économiser de l'argent
balance = 0
def change_it(n):
#Le retrait et le dépôt doivent être de 0
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
Comme vous pouvez le voir en exécutant le code ci-dessus plusieurs fois, le résultat est différent de zéro.
balance = balance + n </ code> peut être divisé en deux opérations atomiques.
x = balance + n
balance = x
Le x </ code> ici est une variable locale, et chaque thread a son propre
x </ code>. Lorsque le code ci-dessus est exécuté dans l'ordre, il devient comme suit.
balance = 0 #valeur initiale
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2 # balance = 0
balance = 0 #Le résultat est correct
Cependant, si l'ordre est différent, le résultat sera différent.
balance = 0 #valeur initiale
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8
balance = -8 #Le résultat est faux
Ce phénomène de calcul imprévisible entraîne le multithreading est appelé Thread-unsafe.
Pour résoudre ce problème, vous devez verrouiller et contrôler le thread.
import threading
#Économiser de l'argent
balance = 0
def change_it(n):
#Obtenez la serrure
lock.acquire()
global balance
balance = balance + n
balance = balance - n
#Relâchez le verrou
lock.release()
def run_thread(n):
for i in range(100000):
change_it(n)
lock = threading.Lock() #Instancier un verrou
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
En utilisant le contrôle exclusif, aucun autre thread ne peut accéder à la ressource tant que le verrou n'est pas libéré. En faisant cela, le résultat du calcul sera toujours 0.
Contrôle exclusif qui peut libérer de manière récursive les verrous imbriqués.
import threading
#Économiser de l'argent
balance = 0
def add_it(n):
lock.acquire()
global balance
balance = balance + n
return balance
def sub_it(n):
lock.acquire()
global balance
balance = balance - n
return balance
def change_it(n):
#Obtenez la serrure
lock.acquire()
global balance
balance = add_it(n)
balance = sub_it(n)
#Verrouillage de libération récursive
lock.release()
def run_thread(n):
for i in range(1000):
change_it(n)
lock = threading.RLock() #Instancier un verrou
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
Ici, le verrou est également acquis dans add_it </ code> et
sub_it </ code>. En utilisant le contrôle exclusif récursif, il n'est pas nécessaire de déverrouiller chaque verrou, et tout peut être libéré en un seul coup. Cependant, il est très coûteux en calcul, nous réduisons donc le nombre de boucles.
Avec un contrôle exclusif, les ressources ne peuvent être traitées que par un seul thread à un certain moment, tandis que le sémapho est une limite qui permet le traitement simultané d'un certain nombre de threads. Par exemple, une situation où il y a trois sièges de toilette dans les toilettes, trois personnes les utilisent en même temps et d'autres font la queue est un sémapho.
import threading
import time
def run(n):
semaphore.acquire()
time.sleep(1)
print("current thread: {}\n".format(n))
semaphore.release()
semaphore = threading.BoundedSemaphore(5) #Permet le traitement simultané de 5 threads
for i in range(22):
t = threading.Thread(target=run, args=("t-{}".format(i),))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('-----Tous les fils sont terminés-----')
Lorsque vous exécutez le code ci-dessus, vous pouvez voir que les chaînes de thread actuelles sont sorties cinq par cinq.
Les événements de thread permettent au thread principal de contrôler d'autres threads. Les méthodes suivantes sont fournies pour Event </ code>.
Méthode | La description |
---|---|
clear | définir l'indicateur sur False |
set | définir l'indicateur sur True |
is_set | Renvoie True lorsque l'indicateur est True |
wait | Continuez à surveiller le drapeau; blocage lorsque le drapeau est faux |
import threading
import time
event = threading.Event()
def lighter():
'''
flag=True:Lumière verte
flag=False:lumière rouge
'''
count = 0
event.set() #La valeur initiale est la lumière verte
while True:
if 5 < count <= 10:
event.clear() #Fais une lumière rouge
print("\33[41;1m lumière rouge...\033[0m")
elif count > 10:
event.set() #Fais un feu vert
count = 0
else:
print("\33[42;1m feu vert...\033[0m")
time.sleep(1)
count += 1
def car(name):
while True:
if event.is_set(): #Vérifiez si le voyant vert est
print("[{}]Avance...".format(name))
time.sleep(1)
else:
print("[{}]Attendez le signal à cause du feu rouge...".format(name))
event.wait()
# flag=Bloquer ici jusqu'à ce que True
print("[{}]Commencez à avancer en raison du feu vert...".format(name))
light = threading.Thread(target=lighter,)
light.start()
car = threading.Thread(target=car, args=("MINI",))
car.start()
Avec le code ci-dessus, nous avons réalisé une communication simple entre le signal et le fil de la voiture lors de l'événement.
Vous pouvez également utiliser une minuterie pour contrôler le thread par heure.
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() #Hello est exécuté après 1 seconde
Il existe également une méthode pour contrôler le thread en jugeant la condition. Les méthodes suivantes sont fournies dans Condition </ code>.
Méthode | La description |
---|---|
wait | Suspendre le fil jusqu'à ce qu'il soit notifié ou lorsque le délai d'expiration de l'argument est atteint |
notify | Fil suspendu (par défaut n=Notifier 1); ne peut être utilisé qu'avec le verrou acquis |
notifyAll | Notifier tous les threads bloqués |
import threading
import time
from random import randint
from collections import deque
class Producer(threading.Thread):
def run(self):
global stocks
while True:
if lock_con.acquire():
products = [randint(0, 100) for _ in range(5)]
stocks.extend(products)
print('Producteur{}Est{}Produit.'.format(self.name, stocks))
lock_con.notify()
lock_con.release()
time.sleep(3)
class Consumer(threading.Thread):
def run(self):
global stocks
while True:
lock_con.acquire()
if len(stocks) == 0:
#Attendez qu'il soit produit lorsque le produit est épuisé
#Suspendre le fil jusqu'à ce qu'il ne soit pas confirmé
lock_con.wait()
print('Client{}Est{}Acheté. Stock: {}'.format(self.name, stocks.popleft(), stocks))
lock_con.release()
time.sleep(0.5)
stocks = deque()
lock_con = threading.Condition()
p = Producer()
c = Consumer()
p.start()
c.start()
Résultat de l'exécution:
Fil du producteur-1 est deque([73, 2, 93, 52, 21])Produit.
Fil client-2 achetés 73. Stock: deque([2, 93, 52, 21])
Fil client-2 achetés 2. Stock: deque([93, 52, 21])
Fil client-2 achetés 93. Stock: deque([52, 21])
Fil client-2 achetés 52. Stock: deque([21])
Fil client-2 achetés 21. Stock: deque([])
Fil du producteur-1 est deque([6, 42, 85, 56, 76])Produit.
Fil client-2 achetés 6. Stock: deque([42, 85, 56, 76])
Fil client-2 achetés 42. Stock: deque([85, 56, 76])
Fil client-2 achetés 85. Stock: deque([56, 76])
Fil client-2 achetés 56. Stock: deque([76])
Fil client-2 achetés 76. Stock: deque([])
C'est un programme simple dans lequel le producteur fabrique 5 produits lorsque le client achète tout le stock.
C'est un contrôle qui est exécuté collectivement lorsque le nombre spécifié de threads passe à travers la barrière. Par exemple, dans un jeu de match en ligne, vous pouvez implémenter une barrière qui attend pendant un certain temps jusqu'à ce que l'équipe atteigne un nombre spécifié de personnes. Les méthodes suivantes sont fournies dans Barrière </ code>.
Méthode | La description |
---|---|
wait | Les threads traversent la barrière; une fois que le nombre spécifié de threads est passé, tous les threads en attente sont libérés |
reset | Vider la barrière; renvoyer BrokenBarrierError au thread en attente |
abort | A brisé la barrière pour rompre l'état; tous les threads actuels sont terminés; retourne BrokenBarrierError aux threads qui tentent de passer à travers la barrière après cela |
import threading
num = 4
def start():
print('{}Depuis que je suis devenu une personne, le jeu a commencé.'.format(num))
lock = threading.Lock()
barrier = threading.Barrier(num, action=start)
class Player(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self):
try:
if not barrier.broken:
print('{}Participé.'.format(self.name))
barrier.wait(2)
except threading.BrokenBarrierError:
print('Parce que le jeu ne peut pas démarrer{}Est parti.'.format(self.name))
players = []
for i in range(10):
lock = threading.Lock()
p = Player(name='Player {}'.format(i))
players.append(p)
for p in players:
p.start()
Résultat d'exécution
Le joueur 0 a participé.
Le joueur 1 a participé.
Le joueur 2 a participé.
Le joueur 3 a participé.
Le jeu a commencé parce qu'il y avait 4 personnes.
Le joueur 4 a participé.
Le joueur 5 a participé.
Le joueur 6 a participé.
Le joueur 7 a participé.
Le jeu a commencé parce qu'il y avait 4 personnes.
Le joueur 8 a participé.
Le joueur 9 a participé.
Le joueur 8 est parti car le jeu ne peut pas démarrer.
Le joueur 9 est parti car le jeu ne peut pas démarrer.
Les threads sont exécutés de manière aléatoire, ils ne sont donc pas toujours sortis dans l'ordre ci-dessus. Ici, les équipes Joueur 8 et Joueur 9 (barrières) ont été forcées de partir (BrokenBarrierError) car elles n'ont pas atteint le nombre spécifié à temps.
1-7. ThreadLocal J'ai expliqué que parce que les données entre les threads sont partagées, vous devez les verrouiller pour calculer la sortie exacte. Cependant, il arrive que vous souhaitiez que chaque thread traite ses propres variables locales.
import threading
#Créer un objet ThreadLocal dans une portée globale
local_school = threading.local()
def process_student():
#Gagnez des étudiants liés au fil actuel
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
#Lier le nom à l'élève dans ThreadLocal
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Résultat de l'exécution:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
Le local_school </ code> est ici une variable globale, mais comme il s'agit d'un objet
ThreadLocal </ code>, la variable d'instance
student </ code> peut être définie sans affecter l'autre de chaque thread. Vous pouvez le faire fonctionner. Vous pouvez regarder
local_school </ code> comme un dictionnaire et lier
enseignant </ code> ainsi que
étudiant </ code>. Et chaque thread peut fonctionner de manière arbitraire et ne s’affecte pas. En tant qu'utilisation de
ThreadLocal </ code>, vous pouvez créer votre propre connexion DB, requête http, etc. pour chaque thread. Du point de vue d'un thread, toutes les données reçues sont comme une variable locale et peuvent être manipulées par n'importe quel autre thread.
fork () </ code>. L'appel de fork () </ code> copiera le processus en cours. Le processus copié est appelé processus enfant et le processus d'origine devient son processus parent. La valeur de retour de fork () </ code> est renvoyée à la fois au processus enfant et au processus parent. Et la valeur de retour du processus enfant est 0, et l'ID du processus enfant est retourné dans le processus parent. La raison en est que le processus parent doit enregistrer l'ID du processus enfant. Vous pouvez obtenir l'ID du processus parent du processus enfant avec getppid </ code>.
Le module Python OS </ code> encapsule le système d'appel système.
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
Résultat de l'exécution:
Process (19148) start...
I (19148) just created a child process (19149).
I am child process (19149) and my parent is 19148.
Ici, le processus parent et le processus enfant entrent dans des branches conditionnelles différentes. Veuillez noter que Windows n'a pas d'appel système fork () </ code> et ne peut pas être exécuté.
En utilisant fork () </ code>, lorsqu'un processus prend une nouvelle tâche, un nouveau processus peut être créé et traité. Par exemple, le serveur Apache bien connu permet au processus parent de surveiller le port et
fork () </ code> pour laisser le processus enfant gérer les nouvelles requêtes http.
Lors de l'écriture de programmes multi-processus Python, il est recommandé d'utiliser le module multiprocessing </ code> de la bibliothèque standard. Le module
multiprocessing </ code> est un module qui peut être traité en parallèle. On dit aussi que le module
multiprocessing </ code> a été implémenté car le module
threading </ code> ne peut pas être traité en parallèle à cause de GIL.
Le module multiprocessing </ code> est également multiplateforme, vous permettant de créer des programmes multi-processus sous Windows. Comme mentionné ci-dessus, Windows n'a pas de
fork () </ code>, donc lors de la création d'un processus avec le module
multiprocessing </ code>, un pseudo
fork () </ code> Je traite. La façon de faire est de sérialiser tous les objets Python dans le processus parent avec
Pickle </ code> et de les transmettre au processus enfant. Ainsi, si l'appel au module
multiprocessing </ code> échoue sous Windows, le
Pickle </ code> peut avoir échoué.
Si vous souhaitez créer un processus enfant et exécuter une commande externe, vous pouvez utiliser le sous-processus </ code> de la bibliothèque standard, mais ici, d'abord, le traitement Python est effectué dans le module multi-processus
multiprocessing </ code>. Je vais vous présenter la fonction.
Vous pouvez facilement créer des processus enfants à l'aide de processus.
from multiprocessing import Process
import os
#Ce que fait le processus enfant
def run_proc(name):
print('Run child process {} ({})...'.format(name, os.getpid()))
print('Parent process {}.'.format(os.getpid()))
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
Résultat de l'exécution:
Parent process 19218.
Child process will start.
Run child process test (19219)...
Child process end.
Passez la fonction d'exécution et les arguments à Process </ code>, créez une instance et démarrez-la avec
start </ code>. Vous pouvez facilement créer un processus enfant à partir de
fork () </ code>. En utilisant
join </ code> ici, le processus parent attend que le processus enfant finisse de s'exécuter, tout comme lorsqu'il s'agit d'un thread.
La création d'un processus enfant est très coûteuse en calcul, donc si vous souhaitez créer un grand nombre de processus, il est plus efficace de créer un pool de processus avec Pool </ code>. Les principales méthodes de
Pool </ code> sont les suivantes.
Méthode | La description |
---|---|
apply | Traitement synchrone |
apply_async | Traitement asynchrone |
terminate | Quittez immédiatement |
join | Le processus parent attend la fin du traitement du processus enfant; la jointure de processus ne peut être effectuée qu'après la fermeture ou la fin |
close | Quitter lorsque tous les processus sont terminés |
from multiprocessing import Pool
import os
import time
import random
def long_time_task(name):
print('Run task {} ({})...'.format(name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task {} runs {} seconds.'.format(name, (end - start)))
print('Parent process {}.'.format(os.getpid()))
p = Pool(4) #Jusqu'à 4 processus enfants en même temps
for i in range(5):
p.apply_async(long_time_task, args=(i,))
#En raison du traitement asynchrone, le processus parent n'a pas à attendre le traitement du processus enfant.
#Faites la prochaine impression
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
Résultat de l'exécution:
Parent process 19348.
Waiting for all subprocesses done...
Run task 0 (19349)...
Run task 1 (19350)...
Run task 2 (19351)...
Run task 3 (19352)...
Task 1 runs 0.8950300216674805 seconds.
Run task 4 (19350)...
Task 2 runs 1.0132842063903809 seconds.
Task 4 runs 0.3936619758605957 seconds.
Task 3 runs 2.3689510822296143 seconds.
Task 0 runs 2.776203155517578 seconds.
All subprocesses done.
Étant donné que la taille du pool est de 4, la tâche 4 commencera à s'exécuter après la fin de l'une des tâches 0 à 3.
Contrairement aux threads, les données ne sont pas partagées entre les processus. Le système d'exploitation propose de nombreuses méthodes de communication inter-processus. multiprocessing </ code> encapsule les fonctionnalités de bas niveau du système d'exploitation pour une facilité d'utilisation.
Les files d'attente de structure de données FIFO sont souvent utilisées pour la communication inter-processus.
from multiprocessing import Process, Queue
import os
import time
import random
#Ecrire des données dans la file d'attente
def write(q):
print('Process to write: {}'.format(os.getpid()))
for value in ['A', 'B', 'C']:
print('Put {} to queue...'.format(value))
q.put(value)
time.sleep(random.random())
#Lire les données de la file d'attente
def read(q):
print('Process to read: {}'.format(os.getpid()))
while True:
value = q.get(True)
print('Get {} from queue.'.format(value))
#Le processus parent crée une file d'attente et la transmet au processus enfant
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
#Démarrez pw et commencez à écrire
pw.start()
#Commencez à lire et commencez à lire
pr.start()
#Attendez que pw se termine
pw.join()
#pr est une boucle infinie, alors tuez
pr.terminate()
Résultat de l'exécution:
Process to write: 19489
Put A to queue...
Process to read: 19490
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Même si la lecture est lente, elle peut être récupérée dans le bon ordre car il s'agit d'un FIFO.
Comme son nom l'indique, les tuyaux peuvent être considérés comme des structures de données en forme de tuyaux. Les données sont transmises en plaçant les données d'un côté du tube (méthode send </ code>) et en recevant les données de l'autre côté (méthode
recv </ code>). Veuillez noter que les données peuvent être corrompues si deux processus placent ou reçoivent des données du même type en même temps.
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
Résultat de l'exécution:
[42, None, 'hello']
J'ai expliqué que les données entre les processus ne sont pas partagées, mais c'est en fait un mensonge ...
En fonction du système d'exploitation, [mémoire partagée] entre les processus (https://ja.wikipedia.org/wiki/%E5%85%B1%E6%9C%89%E3%83%A1%E3%83%A2% E3% 83% AA #% E3% 82% BD% E3% 83% 95% E3% 83% 88% E3% 82% A6% E3% 82% A7% E3% 82% A2% E3% 81% AB% E3 % 82% 88% E3% 82% 8B% E5% 85% B1% E6% 9C% 89% E3% 83% A1% E3% 83% A2% E3% 83% AA) peuvent être réalisés. En Python, Value </ code> et
Array </ code> vous permettent de stocker des données numériques et des dates de tableau dans la mémoire partagée. En passant,
Value </ code> et
Array </ code> utilisent la structure de données du langage C telle quelle. Les [nombres (héritant de la classe des nombres)] de Python (https://docs.python.org/ja/3/library/numbers.html) sont immuables et ne peuvent pas être réécrits directement. ..
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
num = Value('d', 0.0) #numéro de type double
arr = Array('i', range(10)) #Tableau
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
Résultat de l'exécution:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Il peut être plus exact de dire que le gestionnaire partage les données plutôt que de les transmettre. Manager () </ code> renvoie un objet gestionnaire et crée un processus serveur. Grâce au processus serveur, d'autres processus peuvent travailler avec des objets Python de manière proxy. Les objets Manager prennent en charge les objets Python
list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array </ code>.
from multiprocessing import Process, Manager
def f(d, l, i):
d[i] = i
d[str(i)] = str(i)
l.append(i)
print(l)
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
p_list = []
#Créer 10 processus
for i in range(10):
p = Process(target=f, args=(shared_dict, shared_list, i))
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('All subprocesses done.')
print(shared_dict)
print(shared_list)
Résultat de l'exécution:
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 8]
[0, 1, 2, 3, 4, 5, 6, 8, 7]
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
All subprocesses done.
{0: 0, '0': '0', 1: 1, '1': '1', 2: 2, '2': '2', 3: 3, '3': '3', 4: 4, '4': '4', 5: 5, '5': '5', 6: 6, '6': '6', 8: 8, '8': '8', 7: 7, '7': '7', 9: 9, '9': '9'}
[0, 1, 2, 3, 4, 5, 6, 8, 7, 9]
J'ai essayé de créer une liste et un dictionnaire de partage inter-processus dans le gestionnaire. Ici, vous pouvez voir que les processus ne sont pas en séquence.
Comme les threads, les processus ont des verrous.
from multiprocessing import Process, Lock
def f(i):
lock.acquire()
try:
print('hello world', i)
finally:
lock.release()
lock = Lock()
for num in range(10):
Process(target=f, args=(num,)).start()
Résultat de l'exécution:
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
En raison du verrouillage, les nombres sont affichés dans l'ordre contrairement à la fois précédente. Cependant, vous ne pourrez pas démontrer les performances du multi-processus.
Les processus Python peuvent être un traitement de processus distribué utilisant plusieurs machines. Le sous-module managers </ code> du module
multiprocessing </ code> peut répartir les processus sur plusieurs machines. Même si vous ne connaissez pas le protocole de communication, vous pouvez écrire un programme pour le traitement de processus distribué.
Le traitement de processus distribué nécessite un processus serveur qui distribue les tâches et un processus de travail qui traite réellement les tâches. Tout d'abord, implémentez le processus serveur task_master.py </ code>.
Ici, managers </ code> publie la file d'attente sur Internet en tant que ** api **. Une fois que le processus serveur a démarré la file d'attente et mis la tâche, il est accessible à partir d'autres machines.
task_master.py
import random
import queue #Comme c'est via le net, la file d'attente standard de la bibliothèque est suffisante
from multiprocessing.managers import BaseManager
#File d'attente pour envoyer des tâches
task_queue = queue.Queue()
#File d'attente pour recevoir les résultats
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
#Enregistrer deux files d'attente en tant qu'API
#Dans le cas de Windows, lambda peut être utilisé pour l'enregistrement de l'API, veuillez donc définir la fonction de manière obéissante
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
#Utilisez le port 5000 pour le chiffrement d'authentification'abc'À
#Pour Windows, l'adresse doit être spécifiée (127).0.0.1)
manager = QueueManager(address=('', 5000), authkey=b'abc')
#commencer
manager.start()
#Obtenir un objet de file d'attente via net
task = manager.get_task_queue()
result = manager.get_result_queue()
#Essayez de mettre dans une tâche
for i in range(10):
n = random.randint(0, 10000)
print('Put task {}...'.format(n))
task.put(n)
#recevoir le résultat de la file d'attente des résultats
print('Try get results...')
for i in range(10):
#S'il dépasse 10 secondes, il se termine avec un délai d'expiration
r = result.get(timeout=10)
print('Result: {}'.format(r))
#Fin
manager.shutdown()
print('master exit.')
Ensuite, implémentez le task_worker.py </ code> pour le processus de travail. Obtenez la tâche avec ** api ** appelée
manager.get_task_queue </ code> publiée ci-dessus et traitez-la.
task_worker.py
import time
import queue
from multiprocessing.managers import BaseManager
#Créer le même gestionnaire de files d'attente
class QueueManager(BaseManager):
pass
#Obtenez l'API sur le net et enregistrez-la dans le gestionnaire de files d'attente
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#Connectez-vous au serveur
server_addr = '127.0.0.1'
print('Connect to server {}...'.format(server_addr))
#Définissez le même port et le même cryptage d'authentification
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
#Lien
m.connect()
#Obtenez chaque file d'attente
task = m.get_task_queue()
result = m.get_result_queue()
#Recevoir une tâche de la file d'attente des tâches
#Stocker le résultat du traitement dans la file d'attente des résultats
for i in range(10):
try:
n = task.get(timeout=1)
#Ici, la tâche est un simple calcul carré.
print('run task {} * {}...'.format(n, n))
r = '{} * {} = {}'.format(n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty.')
#Fin
print('worker exit.')
Il peut également être exécuté sur une machine locale.
Résultat de l'exécution:
Tout d'abord, le processus serveur place d'abord la tâche dans la task_queue </ code>. Une fois que tout est entré, attendez les résultats dans
result_queue </ code>.
task_master.py
Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
Le processus de travail se connecte ensuite au serveur, récupère la tâche dans la task_queue </ code> et la traite. Le résultat du traitement est envoyé à
result_queue </ code>.
task_worker.py
Connect to server 127.0.0.1...
run task 7710 * 7710...
run task 6743 * 6743...
run task 8458 * 8458...
run task 2439 * 2439...
run task 1351 * 1351...
run task 9885 * 9885...
run task 5532 * 5532...
run task 4181 * 4181...
run task 6093 * 6093...
run task 3815 * 3815...
worker exit.
Lorsque le résultat arrive dans result_queue </ code>, le processus serveur le sort dans l'ordre.
task_master.py
Put task 7710...
Put task 6743...
Put task 8458...
Put task 2439...
Put task 1351...
Put task 9885...
Put task 5532...
Put task 4181...
Put task 6093...
Put task 3815...
Try get results...
Result: 7710 * 7710 = 59444100
Result: 6743 * 6743 = 45468049
Result: 8458 * 8458 = 71537764
Result: 2439 * 2439 = 5948721
Result: 1351 * 1351 = 1825201
Result: 9885 * 9885 = 97713225
Result: 5532 * 5532 = 30603024
Result: 4181 * 4181 = 17480761
Result: 6093 * 6093 = 37124649
Result: 3815 * 3815 = 14554225
master exit.
Toutes les files d'attente sont dans le processus serveur car le processus de travail ne crée pas la file d'attente.
(Source: [réseau gouvernemental de type Hiroyukimine](https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600))De cette manière, les processus distribués peuvent être réalisés en Python. Une puissance de calcul puissante peut être obtenue en utilisant plusieurs travailleurs.
fork () </ code> a été expliqué pour faire une copie du processus actuel en tant que processus enfant. Autrement dit, appeler os.fork </ code> en Python crée un processus enfant de votre programme Python. Cependant, il y a des moments où vous avez besoin d'un processus enfant capable d'exécuter des commandes externes plutôt qu'un programme Python.
Il existe un autre appel système exec () </ code> dans le système d'exploitation basé sur Unix. Il est implémenté en tant que os.execve </ code> en Python. exec () </ code> est une fonction qui remplace actuellement le processus par un autre programme. C'est-à-dire que os.fork </ code> crée un processus enfant d'un programme Python, et os.execve </ code> utilise d'autres programmes ( ls </ code>, ls </ code>, qui peuvent être exécutés dans le shell. Il peut être remplacé par un programme comme code> ping </ code>.
Le sous-processus </ code> de la bibliothèque standard est un module permettant de créer des processus enfants qui exécutent des programmes externes. Ensuite, lors de l'exécution d'un programme externe avec sous-processus </ code>, créez un tube (Pipe) pour la communication inter-processus entre le processus Python et le processus enfant, transmettez les paramètres et envoyez les valeurs de retour et les erreurs. Vous pourrez le recevoir.
3-1. subprocess.run
À partir de Python 3.5, il est officiellement recommandé d'exécuter la commande dans subprocess.run </ code>. Ici, l'explication telle que
subprocess.call </ code> de l'ancienne ** api ** est omise.
subprocess.run(args, *, stdin=None, input=None,
stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False)
subprocess.run </ code> renvoie une instance de la classe
CompletedProcess </ code>. Les attributs de la classe
CompletedProcess </ code> sont les suivants.
attribut | La description |
---|---|
args | Paramètres passés aux processus enfants; chaîne ou liste |
returncode | Stocke le code d'état après l'exécution |
stdout | Sortie standard après exécution |
stderr | Erreur standard après exécution |
check_returncode() | Déclenche CalledProcessError lorsque le code d'état est différent de zéro (échec d'exécution) |
Voici quelques exemples d'utilisation de subprocess.run </ code>.
Vous pouvez capturer la sortie standard avec subprocess.PIPE </ code> (sinon la sortie sera supprimée).
import subprocess
# subprocess.run(["ls", "-l"] stdout=subprocess.PIPE)Pareil que
obj = subprocess.run(["ls", "-l"], stdout=subprocess.PIPE)
print('stdout:\n{}'.format(obj.stdout.decode()))
Résultat de l'exécution:
stdout:
total 128
-rw-r--r--@ 1 kaito staff 692 Feb 16 19:35 1-1.py
-rw-r--r--@ 1 kaito staff 509 Feb 17 23:39 1-2.py
-rw-r--r--@ 1 kaito staff 364 Feb 19 16:48 2-10.py
-rw-r--r--@ 1 kaito staff 645 Feb 19 19:12 2-17.py
-rw-r--r--@ 1 kaito staff 213 Feb 19 19:14 2-18.py
-rw-r--r--@ 1 kaito staff 209 Feb 19 19:18 2-19.py
-rw-r--r--@ 1 kaito staff 318 Feb 19 23:53 2-20.py
-rw-r--r--@ 1 kaito staff 194 Feb 19 23:57 2-21.py
-rw-r--r--@ 1 kaito staff 230 Feb 20 15:46 2-23.py
-rw-r--r--@ 1 kaito staff 131 Feb 18 19:39 2-4.py
-rw-r--r--@ 1 kaito staff 543 Feb 18 19:50 2-8.py
-rw-r--r--@ 1 kaito staff 240 Feb 18 22:29 2-9.py
-rw-r--r-- 1 kaito staff 1339 Feb 27 00:25 task_master.py
-rw-r--r-- 1 kaito staff 1086 Feb 27 00:31 task_worker.py
-rw-r--r-- 1 kaito staff 446 Feb 27 20:26 test.py
-rw-r--r-- 1 kaito staff 199 Feb 27 20:31 test2.py
Si check </ code> est défini sur True, une erreur se produit lorsque le code d'état est différent de zéro.
subprocess.run("exit 1", shell=True, check=True)
Résultat de l'exécution:
Traceback (most recent call last):
File "test2.py", line 4, in <module>
subprocess.run("exit 1", shell=True, check=True)
File "/Users/kaito/opt/miniconda3/lib/python3.7/subprocess.py", line 487, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1.
Le \ _ \ _ repr \ _ \ _ </ code> de la classe
CompletedProcess </ code> ressemble à ceci.
print(subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE))
Résultat de l'exécution:
CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0, stdout=b'crw-rw-rw- 1 root wheel 3, 2 Feb 27 20:37 /dev/null\n')
3-2. subprocess.Popen
Pour les opérations avancées, vous pouvez utiliser la classe subprocess.Popen </ code>.
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False,
startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())
La méthode de la classe subprocess.Popen </ code> est la suivante.
Méthode | La description |
---|---|
poll | Renvoie un code d'état à la fin de l'exécution du processus enfant; renvoie None s'il n'est pas terminé |
wait | Attendez la fin de l'exécution du processus enfant; déclenchez une erreur TimeoutExpired lorsque le délai d'expiration se produit |
communicate | Communiquer avec les processus enfants |
send_signal | Envoyer un signal à un processus enfant, par exemple un signal.signal(signal.SIGINT)Est la ligne de commande du système d'exploitation UNIX, Ctrl+Signal en appuyant sur C |
terminate | Terminer le processus enfant |
kill | Tuer le processus enfant |
Voici quelques exemples d'utilisation de subprocess.Popen </ code>.
Vous pouvez exécuter votre code Python en tant que programme externe.
import subprocess
#Connectez les tuyaux aux entrées standard, aux sorties standard, aux erreurs standard
p = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#Ecrire des données sur l'entrée standard
p.stdin.write(b'print("stdin")\n')
#Passer des données comme entrée pour communiquer
out, err = p.communicate(input=b'print("communicate")\n')
print(out.decode())
Résultat de l'exécution:
stdin
communicate
Le traitement de pipeline à l'aide de | </ code> peut être construit en connectant la sortie standard et l'entrée standard de deux processus enfants avec un tube.
#Conduisez les deux processus enfants ensemble
p1 = subprocess.Popen(['df', '-h'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'Data'], stdin=p1.stdout, stdout=subprocess.PIPE)
out, err = p2.communicate() # df -h | grep Data
print(out.decode())
Résultat de l'exécution:
/dev/disk1s1 466Gi 438Gi 8.0Gi 99% 1156881 4881295959 0% /System/Volumes/Data
map auto_home 0Bi 0Bi 0Bi 100% 0 0 100% /System/Volumes/Data/home
Cependant, l'évolution de Python ne s'est pas encore arrêtée. Un module de haut niveau appelé concurrent </ code> qui encapsule davantage le
threading </ code> et le
multiprocessing </ code> pour faciliter son utilisation est actuellement en cours de développement.
Le concurrent </ code> actuel n'a qu'un module appelé
futures </ code>.
futures </ code> est [Modèle futur](https://ja.wikipedia.org/wiki/Future_%E3%83%91%E3%82%BF%E3%83%BC%E3%83% B3) Implémentation Python. Ici, je voudrais présenter les fonctions qui peuvent être utilisées à ce moment.
concurrent.futures </ code> fournit
ThreadPoolExecutor </ code> et
ProcessPoolExecutor </ code>, qui héritent de la classe
Executor </ code>. Devenir.
ThreadPoolExecutor </ code> et
ProcessPoolExecutor </ code> reçoivent un argument appelé
max_works </ code> qui spécifie le nombre de threads ou de processus. Effectue une seule tâche avec la méthode
submit </ code> et retourne une instance de la classe
Future </ code>.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
def load_url(url):
return requests.get(url)
if __name__ == '__main__':
url = 'https://www.python.org/'
executor = ProcessPoolExecutor(max_workers=4) # ThreadPoolExecutor(max_workers=4)
future = executor.submit(load_url, url)
print(future)
while 1:
if future.done():
print('status code: {}'.format(future.result().status_code))
break
Résultat de l'exécution:
<Future at 0x10ae058d0 state=running>
status code: 200
Une simple requête http. Notez que lorsque vous utilisez ProcessPoolExecutor </ code>, le module
\ _ \ _ main__ </ code> est requis, ne l'exécutez donc pas dans un environnement REPL.
The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.
La méthode submit </ code> ne peut exécuter qu'une seule tâche, donc si vous souhaitez exécuter plusieurs tâches,
map </ code>,
as_completed </ code> et
wait < Utilisez / code>.
La méthode map </ code> prend une fonction d'exécution et une séquence comme arguments et renvoie un générateur de résultats d'exécution.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
return requests.get(url)
if __name__ == '__main__':
# with ThreadPoolExecutor(max_workers=4) as executor:
with ProcessPoolExecutor(max_workers=4) as executor:
for url, data in zip(URLS, executor.map(load_url, URLS)):
print('{} - status_code {}'.format(url, data.status_code))
Résultat de l'exécution:
https://google.com - status_code 200
https://www.python.org/ - status_code 200
https://api.github.com/ - status_code 200
La méthode as_completed </ code> renvoie un générateur pour l'objet
Future </ code>. Et il se bloque lorsque la tâche n'est pas terminée.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
return url, requests.get(url).status_code
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [executor.submit(load_url, url) for url in URLS]
for future in as_completed(tasks):
print(*future.result())
Résultat de l'exécution:
https://google.com 200
https://www.python.org/ 200
https://api.github.com/ 200
La méthode wait </ code> bloque le thread principal et le processus principal. Vous pouvez définir trois conditions avec l'argument
return_when </ code>.
conditions | La description |
---|---|
ALL_COMPLETED | Libérez le blocage lorsque toutes les tâches sont terminées |
FIRST_COMPLETED | Libérer le blocage lorsqu'une tâche est terminée |
FIRST_EXCEPTION | Libérez le blocage si une tâche provoque une erreur |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED
import requests
URLS = ['https://google.com', 'https://www.python.org/', 'https://api.github.com/']
def load_url(url):
requests.get(url)
print(url)
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [executor.submit(load_url, url) for url in URLS]
wait(tasks, return_when=ALL_COMPLETED)
print('all completed.') #Après 3 impressions, le processus principal est libéré pour imprimer
Résultat de l'exécution:
https://www.python.org/
https://api.github.com/
https://google.com
all completed.
Exécution parallèle threading --- Threading traitement parallèle multitraitement --- Traitement parallèle basé sur les processus sous-processus --- Gestion des sous-processus concurrent.futures - Exécution de tâches parallèles
Recommended Posts