Python 3.7.4 Windows 10
Une erreur s'est produite lors de l'utilisation de ProcessPoolExecutor pour le traitement parallèle. Ci-dessous le code.
import concurrent
import concurrent.futures
class Process:
def __init__(self):
self.process_list = []
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) #Fonctionne avec le traitement des threads
def _process(self, n):
#Unité de traitement
return 1
def start_process(self, n):
#Département d'exécution
self.process_list.append(self.executor.submit(self._process, n))
def get_result(self):
#Obtenir des résultats
print("wait...")
concurrent.futures.wait(self.process_list, return_when=concurrent.futures.FIRST_EXCEPTION)
print("all processes were finished")
res_list = [res.result() for res in self.process_list]
print("got result")
self.executor.shutdown(wait=True)
print("shutdown")
self.process_list = []
return res_list
if __name__ == "__main__":
process = Process()
for i in range(10):
process.start_process(i)
result = process.get_result()
print(result)
Quand j'exécute ce code, il dit res_list = [res.result () pour res dans self.process_list]
TypeError: can't pickle _thread.RLock objects
Erreur se produit.
À propos, ThreadPoolExecutor fonctionne.
Partie de traitement
def _process(self, n):
#Unité de traitement
return 1
De
@staticmethod
def _process(n):
#Unité de traitement
return 1
Changer pour. Si l'unité de traitement utilise la variable d'instance d'elle-même (self), ajoutez-la à l'argument.
De plus, si vous utilisiez votre propre méthode d'instance, vous devez remplacer cette méthode par une méthode qui ne prend pas self comme argument (méthode statique, méthode de classe, etc.).
Je pense que ce n'est pas grave si vous ne passez pas un objet qui contient un objet qui ne peut pas être picklé (ProcessPoolExecutor, queue.Queue, threading.Lock, threading.RLock, etc.) comme argument de la méthode transmise à ProcessPoolExecutor.
Ce problème est dû au fait que l'instance de classe contient un ProcessPoolExecutor, un objet qui ne peut pas être picklé, et est passé à plusieurs processus via l'argument self de la méthode d'instance.
Selon la documentation ProcessPoolExecutor
ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. (Source: https://docs.python.org/ja/3/library/concurrent.futures.html#processpoolexecutor)
Tel qu'il est écrit, il semble que seuls les objets qui peuvent être décapés peuvent être exécutés et retournés.
Par conséquent, cela peut être évité en n'utilisant pas la méthode d'instance (en utilisant une méthode qui ne prend pas self comme argument).
Ceci est un exemple de modification lors de l'utilisation de variables d'instance. «self.calc» et «self.hoge» sont des variables d'instance.
"""
Si vous utilisiez des variables d'instance
"""
import concurrent
import concurrent.futures
class Calc:
def __init__(self, a):
self.a = a
def calc(self, n):
return self.a + n
class Process:
def __init__(self):
self.process_list = []
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
self.calc = Calc(10) #Instance de la classe à traiter
self.hoge = 3 #Variable d'instance
def _process_bad(self, n):
res = self.calc.calc(n) * self.hoge
return res
@staticmethod
def _process(calc, n, hoge):
res = calc.calc(n) * hoge
return res
def start_process(self, n):
#Département d'exécution
# self.process_list.append(self.executor.submit(self._process_bad, n)) # NG
self.process_list.append(self.executor.submit(self._process, self.calc, n, self.hoge)) # OK
def get_result(self):
#réduction
if __name__ == "__main__":
process = Process()
for i in range(10):
process.start_process(i)
result = process.get_result()
print(result)
Si vous l'avez écrit comme _process_bad ()
, vous devez passer toutes les variables utilisées dans la méthode comme arguments comme _process ()
.
De plus, la classe Calc
ne doit pas contenir d'objets qui ne peuvent pas être décapés.
À propos, en plus d'utiliser la méthode statique, cela fonctionne même si vous utilisez une méthode de classe ou appelez une méthode en dehors de la classe.
#Exemple de méthode de classe
@classmethod
def _process(cls, calc, n, hoge):
res = calc.calc(n) * hoge
return res
Lorsque j'ai rencontré ce problème, j'ai été arrêté avant d'obtenir les résultats, comme indiqué ci-dessous.
def get_result(self):
#Obtenir des résultats
self.executor.shutdown(wait=True) #Arrêtez avant d'obtenir
res_list = [res.result() for res in self.process_list]
self.process_list = []
return res_list
Cela provoquera son arrêt sans afficher de message d'erreur.
Aussi, j'ai mal compris qu'il n'y a pas de problème si je ne le mets même pas dans la valeur de retour pour les choses qui ne peuvent pas être marinées, donc je n'ai pas compris la cause pendant un moment.
C'était un processus multi-processus que j'ai écrit souffrant d'erreurs, mais le processus que je voulais faire avait un temps d'exécution qui n'était pas très différent du traitement multi-thread.
Lors de l'utilisation d'une bibliothèque externe, il semble que plusieurs processeurs puissent être utilisés même pour le traitement multi-thread.
[1] https://bugs.python.org/issue29423 (using concurrent.futures.ProcessPoolExecutor in class giving 'TypeError: can't pickle _thread.RLock objects' in 3.6, but not 3.5) [2] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (L'expression lambda ne peut pas être transmise à func dans ProcessPoolExector.map) [3] https://qiita.com/kokumura/items/2e3afc1034d5aa7c6012 (mémo d'utilisation concurrent.futures) [4] https://docs.python.org/ja/3.6/library/concurrent.futures.html (17.4. Concurrent.futures - Exécution de tâches parallèles) [5] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (Impossible de transmettre l'expression lambda à func dans ProcessPoolExector.map) [6] https://qiita.com/kaitolucifer/items/e4ace07bd8e112388c75#4-concurrentfutures (Compréhension complète du thread Python et du multitraitement)