Python 3.7.4 Windows 10
An error occurred when using ProcessPoolExecutor for parallel processing. Below is the code.
import concurrent
import concurrent.futures
class Process:
def __init__(self):
self.process_list = []
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) #Works with thread processing
def _process(self, n):
#Processing unit
return 1
def start_process(self, n):
#Execution department
self.process_list.append(self.executor.submit(self._process, n))
def get_result(self):
#Get results
print("wait...")
concurrent.futures.wait(self.process_list, return_when=concurrent.futures.FIRST_EXCEPTION)
print("all processes were finished")
res_list = [res.result() for res in self.process_list]
print("got result")
self.executor.shutdown(wait=True)
print("shutdown")
self.process_list = []
return res_list
if __name__ == "__main__":
process = Process()
for i in range(10):
process.start_process(i)
result = process.get_result()
print(result)
When I run this code, it says res_list = [res.result () for res in self.process_list]
TypeError: can't pickle _thread.RLock objects
Error occurs.
By the way, ThreadPoolExecutor works.
Processing part
def _process(self, n):
#Processing unit
return 1
From
@staticmethod
def _process(n):
#Processing unit
return 1
Change to. If the processing part uses the instance variable that it has (self), add it to the argument.
Also, if you were using your own instance method, you need to change that method to one that does not take self as an argument (staticmethod, classmethod, etc.).
I think it's okay if you don't pass an object that contains an object that cannot be pickled (ProcessPoolExecutor, queue.Queue, threading.Lock, threading.RLock, etc.) as an argument of the method passed to ProcessPoolExecutor.
This issue is due to the fact that the instance of the class contains a ProcessPoolExecutor, an object that cannot be pickled, and is passing it to the multiprocess by the self argument of the instance method.
According to the ProcessPoolExecutor documentation
ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. (Source: https://docs.python.org/ja/3/library/concurrent.futures.html#processpoolexecutor)
As written, it seems that only objects that can be pickled can be executed & returned.
Therefore, it can be avoided by not using the instance method (using a method that does not take self as an argument).
This is a modification example when using instance variables. self.calc
and self.hoge
are instance variables.
"""
If you were using instance variables
"""
import concurrent
import concurrent.futures
class Calc:
def __init__(self, a):
self.a = a
def calc(self, n):
return self.a + n
class Process:
def __init__(self):
self.process_list = []
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
self.calc = Calc(10) #Instance of the class to process
self.hoge = 3 #Instance variables
def _process_bad(self, n):
res = self.calc.calc(n) * self.hoge
return res
@staticmethod
def _process(calc, n, hoge):
res = calc.calc(n) * hoge
return res
def start_process(self, n):
#Execution department
# self.process_list.append(self.executor.submit(self._process_bad, n)) # NG
self.process_list.append(self.executor.submit(self._process, self.calc, n, self.hoge)) # OK
def get_result(self):
#abridgement
if __name__ == "__main__":
process = Process()
for i in range(10):
process.start_process(i)
result = process.get_result()
print(result)
If you wrote it like _process_bad ()
, you need to pass all the variables used in the method as arguments like _process ()
.
Also, the Calc
class must not contain objects that cannot be pickled.
By the way, in addition to using static method, it works even if you use class method or call a method outside the class.
#Example of class method
@classmethod
def _process(cls, calc, n, hoge):
res = calc.calc(n) * hoge
return res
When I ran into this issue I was shut down before getting the results, like this:
def get_result(self):
#Get results
self.executor.shutdown(wait=True) #Shutdown before getting
res_list = [res.result() for res in self.process_list]
self.process_list = []
return res_list
This will cause it to stop without an error message.
Also, I misunderstood that there is no problem if I do not even put it in the return value for things that can not be pickled, so I did not understand the cause for a while.
It was a multi-process process that I wrote suffering from errors, but the process I wanted to do had an execution time that was not much different from multi-threading.
When using an external library, it seems that multiple CPUs may be used even for multithread processing.
[1] https://bugs.python.org/issue29423 (using concurrent.futures.ProcessPoolExecutor in class giving 'TypeError: can't pickle _thread.RLock objects' in 3.6, but not 3.5) [2] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (You cannot pass a lambda expression to func in ProcessPoolExector.map) [3] https://qiita.com/kokumura/items/2e3afc1034d5aa7c6012 (concurrent.futures usage memo) [4] https://docs.python.org/ja/3.6/library/concurrent.futures.html (17.4. concurrent.futures --Parallel task execution) [5] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (Cannot pass lambda expression to func in ProcessPoolExector.map) [6] https://qiita.com/kaitolucifer/items/e4ace07bd8e112388c75#4-concurrentfutures (Complete understanding of Python threading and multiprocessing)
Recommended Posts