Référencé: https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/
Vous pouvez ajuster le degré de parallélisme en spécifiant max_workers dans Executor, mais ce qui se passe si vous soumettez à un rythme qui dépasse le nombre de parallèles ne bloque pas. Au lieu de cela, il semble stocker en mémoire. En raison de ce comportement, l'exécution en grand nombre peut consommer beaucoup de mémoire.
with ThreadPoolExecutor(max_workers=10) as executor:
for i in range(0, 1024*1024): #Beaucoup
executor.submit(fn, i) #faire
#La boucle for se termine bientôt, mais la consommation de mémoire est censée être excellente
En fait, écrire du code qui boucle 1 million consomme environ 2 Go de mémoire. J'ai donc décidé de réfléchir à la manière d'y faire face.
Après vérification de l'implémentation interne, ThreadPoolExecutor a une file d'attente en interne, Submit crée un objet appelé WorkItem et le met dans la file d'attente. Cette file d'attente interne n'a pas de limite supérieure et ne peut jamais être bloquée, vous pouvez donc soumettre à l'infini.
À propos, le thread de travail est créé au moment de l'emballage, [le thread de travail récupère les données de la file d'attente et les exécute dans une boucle infinie](https://github.com/python/cpython/ blob / v3.8.6 / Lib / concurrent / futures / thread.py # L66).
Observons réellement le mouvement. Par exemple, exécutez une fonction qui prend 0,01 seconde 5000 fois. Tournons ceci avec max_workers = 10.
Regardez l'horodatage et la mémoire (maxrss cette fois) en tant que progression dans l'instruction for.
https://wandbox.org/permlink/n2P2CQssjhj1eOFw
À partir de l'horodatage, vous pouvez voir que le blocage ne s'est pas produit lors de la soumission (le processus de soumission de la boucle for est terminé immédiatement et il attend presque l'arrêt) Cependant, vous pouvez voir que la consommation de mémoire augmente à mesure que le processus progresse.
C'est la première méthode à laquelle j'ai pensé. Fait de la file d'attente utilisée dans ThreadPool Executor une file d'attente dimensionnée. Héritez et remplacez les variables d'instance.
https://wandbox.org/permlink/HJN0lRBR0VBYU0Pv
Vous pouvez voir à partir de l'horodatage que le blocage se produit pendant la boucle. Cependant, le temps total ne change pas beaucoup et la consommation de mémoire est très lente.
Le code est simple, mais il semble un peu bâclé d'entrer dans l'implémentation interne, et ProcessPoolExecutor n'a pas ces files d'attente, donc cette méthode ne fonctionne pas.
Puisque le plan 1 n'était pas assez bon, je cherchais quelque chose que je pourrais faire et j'ai trouvé l'article de référence.
https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/
Créez une classe BoundedExecutor qui encapsule PoolExecutor en faisant référence à la source de référence. Comme il est compatible avec l'API (autre que la carte), il peut être utilisé en remplacement.
L'implémentation interne contrôle la concurrence en décomptant le sémaphore au moment de la soumission et en comptant le sémaphore lorsque le traitement de travail est terminé. "Lorsque le traitement de travail est terminé" est "lorsque la fonction enregistrée par add_done_callback de future est appelée lorsqu'elle est terminée". (Le rappel est appelé lorsque le traitement du travailleur est terminé et lorsque l'exception de levée se produit, donc Tsuji doit correspondre.)
https://wandbox.org/permlink/jn4nN8leJonLi2ty
Cela a également donné le même résultat que dans le plan 1.
Au fait, il est préférable de décider de la taille de la file d'attente pour qu'elle soit plus grande que max_workers (dans le code, donnez ou modifiez l'argument pour que bounded_ratio = 1 devienne bounded_ratio = 2) Si vous définissez "Nombre de parallèles == Taille de la file d'attente", il y aura un moment où la file d'attente sera vide, les travailleurs joueront et l'achèvement global sera légèrement retardé. Par conséquent, il vaut mieux l'augmenter un peu.
https://wandbox.org/permlink/HPrJXNGxLeXzB1x2
Recommended Posts