There are several ways to multi-process parallel tasks in Python. One of them is concurrent.futures.ProcessPoolExecutor.
This is a built-in class added from Python 3.2. While using MultiProcessing internally, it is user-friendly by organizing (restricting) IFs. This library is recommended if you want to use multi-process easily.
On the other hand, if you want to do complicated processing, IF is scarce, so it is often out of reach of the itchy place. Among them, I have a problem with the process management that ProcessPool Executor is doing, so I will write it down.
There is a function called concurrent.futures.as_completed (fs, timeout = None) that waits for the completion of the Future generated by ProcessPoolExecutor. As you can see, you can time out by specifying the number of seconds in the second argument, and if it times out, concurrent.futures.TimeoutError will be sent.
If it times out, I think it will cancel (concurrent.futures.TimeoutError) the running or waiting future. However, cancellation does not work for running Futures. (Although it is described in the document, it is a mystery that only the cancellation explanation is in English in the Japanese document)
This is a problem if the processing you are executing is waiting indefinitely and you want to end it immediately.
(IO is locked in the process being executed, etc. ← In the case of a method, it is ideal to write the release process with __del__
)
There is a story that it is bad to make infinite wait, but I think that there are times when you want to control with a timeout in the main process.
Click here for the actual code.
import time
import concurrent
from concurrent.futures.process import ProcessPoolExecutor
def test(value: int) -> int:
"""
Sleep and return the specified value
Args:
value:Specified value
"""
time.sleep(100)
return value
def main():
with ProcessPoolExecutor(max_workers=2) as executor:
#Future creation
futures = []
for index in range(5):
future = executor.submit(test, index)
futures.append(future)
#Run
try:
timeout = 5
for future in concurrent.futures.as_completed(futures, timeout):
result = future.result()
print(result)
except concurrent.futures.TimeoutError as _:
#Show current future status
print("Timeout -----")
for future in futures:
print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")
#Cancel Future
for future in futures:
if not future.running():
future.cancel()
#Check the status of future after execution
print("Executor Shutdown -----")
for future in futures:
print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")
It is a process that prepares two workers, waits for 100 seconds, and executes five tasks of a function that returns the specified value. When you do this, you should see something like this:
Timeout ----- 2579991022224 running: True cancelled: False 2579991067424 running: True cancelled: False 2579991117104 running: True cancelled: False 2579991116480 running: False cancelled: False 2579991117536 running: False cancelled: False Executor Shutdown ----- 2579991022224 running: True cancelled: False 2579991067424 running: True cancelled: False 2579991117104 running: True cancelled: False 2579991116480 running: False cancelled: True 2579991117536 running: False cancelled: True
The first three Futures are still running and the main process is stuck without finishing (the bottom two have been cancelled). This is because it is waiting for the processing of the running Future. With the current process, the main process will end in about 200 seconds.
You can access the internally managed process, so I'll kill it directly. Click here for the actual code.
import time
import concurrent
from concurrent.futures.process import ProcessPoolExecutor
def test(value: int) -> int:
"""
Sleep and return the specified value
Args:
value:Specified value
"""
time.sleep(100)
return value
def main():
with ProcessPoolExecutor(max_workers=2) as executor:
#Future creation
futures = []
for index in range(5):
future = executor.submit(test, index)
futures.append(future)
#Run
try:
timeout = 5
for future in concurrent.futures.as_completed(futures, timeout):
result = future.result()
print(result)
except concurrent.futures.TimeoutError as _:
#Show current future status
print("Timeout -----")
for future in futures:
print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")
#Cancel Future
for future in futures:
if not future.running():
future.cancel()
#Kill the process
# !!Add here!!
for process in executor._processes.values():
process.kill()
#Check the status of future after execution
print("Executor Shutdown -----")
for future in futures:
print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")
I'm killing Process after canceling Future. In ProcessPoolExecutor, it is a Protected property, so it is supposed to be inaccessible ... If you're using MultiProcessing, you're doing it normally.
When you do this, you should see something like this:
Timeout ----- 1348685908624 running: True cancelled: False 1348685953824 running: True cancelled: False 1348686003552 running: True cancelled: False 1348686003888 running: False cancelled: False 1348686004128 running: False cancelled: False Executor Shutdown ----- 1348685908624 running: False cancelled: False 1348685953824 running: False cancelled: False 1348686003552 running: False cancelled: False 1348686003888 running: False cancelled: True 1348686004128 running: False cancelled: True
The runnning of the first three Futures is in the False state. Now that the Future has stopped running, the main process will end immediately.
It was a tricky method, but I introduced how to terminate the process immediately with ProcessPoolExecutor. I don't think I will use it much, but I think it may be a problem in rare cases, so please refer to it.
The internal processing of ProcessPoolExecutor is in ~ \ lib \ concurrent \ futures \ process.py. The explanation is written at the beginning with the figure, which is very easy to understand, so I recommend you to read it.
Recommended Posts