Ne passez pas self à ProcessPoolExecutor en classe

environnement

Python 3.7.4 Windows 10

ProcessPoolExecutor s'arrête

Une erreur s'est produite lors de l'utilisation de ProcessPoolExecutor pour le traitement parallèle. Ci-dessous le code.

import concurrent
import concurrent.futures

class Process:
    def __init__(self):
        self.process_list = []
        self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
        # self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)  #Fonctionne avec le traitement des threads
    
    def _process(self, n):
        #Unité de traitement
        return 1
    
    def start_process(self, n):
        #Département d'exécution
        self.process_list.append(self.executor.submit(self._process, n)) 
    
    def get_result(self):
        #Obtenir des résultats
        print("wait...")
        concurrent.futures.wait(self.process_list, return_when=concurrent.futures.FIRST_EXCEPTION)
        print("all processes were finished")
        res_list = [res.result() for res in self.process_list]
        print("got result")
        self.executor.shutdown(wait=True)
        print("shutdown")
        self.process_list = []
        return res_list

if __name__ == "__main__":
    process = Process()
    for i in range(10):
        process.start_process(i)
    result = process.get_result()
    print(result)

Quand j'exécute ce code, il dit res_list = [res.result () pour res dans self.process_list]

TypeError: can't pickle _thread.RLock objects

Erreur se produit.

À propos, ThreadPoolExecutor fonctionne.

Solutions

Partie de traitement

def _process(self, n):
    #Unité de traitement
    return 1

De

@staticmethod
def _process(n):
    #Unité de traitement
    return 1

Changer pour. Si l'unité de traitement utilise la variable d'instance d'elle-même (self), ajoutez-la à l'argument.

De plus, si vous utilisiez votre propre méthode d'instance, vous devez remplacer cette méthode par une méthode qui ne prend pas self comme argument (méthode statique, méthode de classe, etc.).

Je pense que ce n'est pas grave si vous ne passez pas un objet qui contient un objet qui ne peut pas être picklé (ProcessPoolExecutor, queue.Queue, threading.Lock, threading.RLock, etc.) comme argument de la méthode transmise à ProcessPoolExecutor.

Cause

Ce problème est dû au fait que l'instance de classe contient un ProcessPoolExecutor, un objet qui ne peut pas être picklé, et est passé à plusieurs processus via l'argument self de la méthode d'instance.

Selon la documentation ProcessPoolExecutor

ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. (Source: https://docs.python.org/ja/3/library/concurrent.futures.html#processpoolexecutor)

Tel qu'il est écrit, il semble que seuls les objets qui peuvent être décapés peuvent être exécutés et retournés.

Par conséquent, cela peut être évité en n'utilisant pas la méthode d'instance (en utilisant une méthode qui ne prend pas self comme argument).

Exemple de modification

Ceci est un exemple de modification lors de l'utilisation de variables d'instance. «self.calc» et «self.hoge» sont des variables d'instance.

"""
Si vous utilisiez des variables d'instance
"""
import concurrent
import concurrent.futures

class Calc:
    def __init__(self, a):
        self.a = a
    
    def calc(self, n):
        return self.a + n

class Process:
    def __init__(self):
        self.process_list = []
        self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
        # self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
        self.calc = Calc(10)  #Instance de la classe à traiter
        self.hoge = 3  #Variable d'instance
    
    def _process_bad(self, n):
        res = self.calc.calc(n) * self.hoge
        return res
    
    @staticmethod
    def _process(calc, n, hoge):
        res = calc.calc(n) * hoge
        return res
    
    def start_process(self, n):
        #Département d'exécution
        # self.process_list.append(self.executor.submit(self._process_bad, n))  # NG
        self.process_list.append(self.executor.submit(self._process, self.calc, n, self.hoge))  # OK
    
    def get_result(self):
       #réduction

if __name__ == "__main__":
    process = Process()
    for i in range(10):
        process.start_process(i)
    result = process.get_result()
    print(result)

Si vous l'avez écrit comme _process_bad (), vous devez passer toutes les variables utilisées dans la méthode comme arguments comme _process ().

De plus, la classe Calc ne doit pas contenir d'objets qui ne peuvent pas être décapés.

À propos, en plus d'utiliser la méthode statique, cela fonctionne même si vous utilisez une méthode de classe ou appelez une méthode en dehors de la classe.

#Exemple de méthode de classe
@classmethod
def _process(cls, calc, n, hoge):
    res = calc.calc(n) * hoge
    return res

en conclusion

Lorsque j'ai rencontré ce problème, j'ai été arrêté avant d'obtenir les résultats, comme indiqué ci-dessous.

def get_result(self):
    #Obtenir des résultats
    self.executor.shutdown(wait=True)  #Arrêtez avant d'obtenir
    res_list = [res.result() for res in self.process_list]
    self.process_list = []
    return res_list

Cela provoquera son arrêt sans afficher de message d'erreur.

Aussi, j'ai mal compris qu'il n'y a pas de problème si je ne le mets même pas dans la valeur de retour pour les choses qui ne peuvent pas être marinées, donc je n'ai pas compris la cause pendant un moment.

C'était un processus multi-processus que j'ai écrit souffrant d'erreurs, mais le processus que je voulais faire avait un temps d'exécution qui n'était pas très différent du traitement multi-thread.

Lors de l'utilisation d'une bibliothèque externe, il semble que plusieurs processeurs puissent être utilisés même pour le traitement multi-thread.

référence

[1] https://bugs.python.org/issue29423 (using concurrent.futures.ProcessPoolExecutor in class giving 'TypeError: can't pickle _thread.RLock objects' in 3.6, but not 3.5) [2] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (L'expression lambda ne peut pas être transmise à func dans ProcessPoolExector.map) [3] https://qiita.com/kokumura/items/2e3afc1034d5aa7c6012 (mémo d'utilisation concurrent.futures) [4] https://docs.python.org/ja/3.6/library/concurrent.futures.html (17.4. Concurrent.futures - Exécution de tâches parallèles) [5] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (Impossible de transmettre l'expression lambda à func dans ProcessPoolExector.map) [6] https://qiita.com/kaitolucifer/items/e4ace07bd8e112388c75#4-concurrentfutures (Compréhension complète du thread Python et du multitraitement)

Recommended Posts

Ne passez pas self à ProcessPoolExecutor en classe
Pour faire l'équivalent de Ruby ObjectSpace._id2ref en Python
Que faire quand n'est pas dans le fichier sudoers. Cet incident sera signalé.
Que faire si la barre de progression n'est pas affichée dans tqdm de python
Que faire lorsque Python ne passe pas de la version système dans pyenv
Comment utiliser la méthode __call__ dans la classe Python
Comment définir l'attribut de classe html dans le formulaire forms.py de Django
Programmation pour combattre dans le monde ~ 5-1
Programmation pour combattre dans le monde ~ 5-5,5-6
Programmer pour combattre dans le monde 5-3
[Python] Comment faire PCA avec Python
Programmation pour combattre dans le monde - Chapitre 4
Que faire lorsque les paramètres de l'extension jupyterlab ne sont pas reflétés
Tous les éléments bougent (ne restent pas dans la même position) shuffle
Transmettez des arguments à Task dans discord.py
Je veux réussir le test G dans un mois Jour 1
Essayez Cython dans les plus brefs délais
Que faire lorsque le type de valeur est ambigu en Python?
Programmation pour combattre dans le monde ~ 5-2
Que faire si le nom d'utilisateur est modifié et que le chemin de la bibliothèque pyenv ne passe pas
Que faire lorsque le résultat téléchargé via scrapy est en anglais
Je veux expliquer en détail la classe abstraite (ABCmeta) de Python
Comment masquer les avertissements qui n'affectent pas l'exécution dans Jupyter Notebook
Que faire lorsque l'avertissement "L'environnement est cohérent ..." apparaît dans l'environnement Anaconda
[Introduction à Python] Comment utiliser la classe en Python?
Comment faire R chartr () en Python
Dans Jupyter, ajoutez IPerl au noyau.
Comment utiliser __slots__ dans la classe Python
Divers commentaires à écrire dans le programme
[Matplotlib] N'inclinez pas l'étiquette de l'axe
Comment passer le résultat de l'exécution d'une commande shell dans une liste en Python
Exemple de ce qu'il faut faire lorsque l'exemple de script ne fonctionne pas (OpenCV-Python)
Comment passer le chemin vers la bibliothèque construite avec pyenv et virtualenv avec PyCharm
Comment identifier de manière unique la source d'accès dans la vue de classe générique Django
[Python] A créé une classe pour jouer des vagues de péché en arrière-plan avec pyaudio
Que faire lorsque "TypeError: type de données non compris" apparaît dans numpy.zeros de python
Que faire si vous obtenez «Python non configuré». Utilisation de PyDev dans Eclipse
Que faire si NotADirectoryError: [Errno 20] Pas un répertoire: 'xdg-settings' apparaît dans le notebook jupyter
Que faire si une erreur de version se produit dans le pilote Selenium Chrome
Je n'ai pas eu besoin d'écrire décorateur en classe Merci contextmanager
Comment utiliser la bibliothèque C en Python
Connectez-vous à un serveur distant avec SSH
Que faire si pipreqs aboutit à UnicodeDecodeError
Essayez de résoudre le problème de l'héritage de classe Python
Implémentation minimale d'Union Find en Python
Client de streaming Twitter à apprécier dans le terminal
Pour remplacer dynamiquement la méthode suivante en python
Dessinez des graphiques dans Julia ... Laissez les graphiques à Python
Comment transmettre les paramètres au pipeline d'articles dans Scrapy
Conseils pour rédiger un aplatissement concis en python
Passer les informations de connexion à afficher dans Django
Comment obtenir les fichiers dans le dossier [Python]
PyQtGraph peut ne pas être disponible dans l'interpréteur.
Que faire pour obtenir une feuille de calcul Google en Python
Connectez-vous avec json en utilisant pygogo.
Je veux afficher la progression en Python!
Tutoriel "Cython" pour rendre Python explosif: Passez l'objet de classe C ++ à l'objet de classe côté Python. Partie 2
[Jinja2] Solution au problème que les variables ajoutées dans l'instruction for ne sont pas héritées
Que faire lorsque seule la fenêtre est affichée et que rien ne s'affiche dans le pygame