How to kill a process instantly with Python's Process Pool Executor

Introduction

There are several ways to multi-process parallel tasks in Python. One of them is concurrent.futures.ProcessPoolExecutor.

This is a built-in class added from Python 3.2. While using MultiProcessing internally, it is user-friendly by organizing (restricting) IFs. This library is recommended if you want to use multi-process easily.

On the other hand, if you want to do complicated processing, IF is scarce, so it is often out of reach of the itchy place. Among them, I have a problem with the process management that ProcessPool Executor is doing, so I will write it down.

trouble

There is a function called concurrent.futures.as_completed (fs, timeout = None) that waits for the completion of the Future generated by ProcessPoolExecutor. As you can see, you can time out by specifying the number of seconds in the second argument, and if it times out, concurrent.futures.TimeoutError will be sent.

If it times out, I think it will cancel (concurrent.futures.TimeoutError) the running or waiting future. However, cancellation does not work for running Futures. (Although it is described in the document, it is a mystery that only the cancellation explanation is in English in the Japanese document)

This is a problem if the processing you are executing is waiting indefinitely and you want to end it immediately. (IO is locked in the process being executed, etc. ← In the case of a method, it is ideal to write the release process with __del__) There is a story that it is bad to make infinite wait, but I think that there are times when you want to control with a timeout in the main process.

Click here for the actual code.

import time
import concurrent
from concurrent.futures.process import ProcessPoolExecutor


def test(value: int) -> int:
    """
Sleep and return the specified value
    Args:
        value:Specified value
    """
    time.sleep(100)
    return value

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        #Future creation
        futures = []
        for index in range(5):
            future = executor.submit(test, index)
            futures.append(future)

        #Run
        try:
            timeout = 5
            for future in concurrent.futures.as_completed(futures, timeout):
                result = future.result()
                print(result)

        except concurrent.futures.TimeoutError as _:
            #Show current future status
            print("Timeout -----")
            for future in futures:
                print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

            #Cancel Future
            for future in futures:
                if not future.running():
                    future.cancel()

    #Check the status of future after execution
    print("Executor Shutdown -----")
    for future in futures:
        print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

It is a process that prepares two workers, waits for 100 seconds, and executes five tasks of a function that returns the specified value. When you do this, you should see something like this:

Timeout ----- 2579991022224 running: True cancelled: False 2579991067424 running: True cancelled: False 2579991117104 running: True cancelled: False 2579991116480 running: False cancelled: False 2579991117536 running: False cancelled: False Executor Shutdown ----- 2579991022224 running: True cancelled: False 2579991067424 running: True cancelled: False 2579991117104 running: True cancelled: False 2579991116480 running: False cancelled: True 2579991117536 running: False cancelled: True

The first three Futures are still running and the main process is stuck without finishing (the bottom two have been cancelled). This is because it is waiting for the processing of the running Future. With the current process, the main process will end in about 200 seconds.

solution

You can access the internally managed process, so I'll kill it directly. Click here for the actual code.

import time
import concurrent
from concurrent.futures.process import ProcessPoolExecutor


def test(value: int) -> int:
    """
Sleep and return the specified value
    Args:
        value:Specified value
    """
    time.sleep(100)
    return value

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        #Future creation
        futures = []
        for index in range(5):
            future = executor.submit(test, index)
            futures.append(future)

        #Run
        try:
            timeout = 5
            for future in concurrent.futures.as_completed(futures, timeout):
                result = future.result()
                print(result)

        except concurrent.futures.TimeoutError as _:
            #Show current future status
            print("Timeout -----")
            for future in futures:
                print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

            #Cancel Future
            for future in futures:
                if not future.running():
                    future.cancel()

            #Kill the process
            # !!Add here!!
            for process in executor._processes.values():
                process.kill()

    #Check the status of future after execution
    print("Executor Shutdown -----")
    for future in futures:
        print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

I'm killing Process after canceling Future. In ProcessPoolExecutor, it is a Protected property, so it is supposed to be inaccessible ... If you're using MultiProcessing, you're doing it normally.

When you do this, you should see something like this:

Timeout ----- 1348685908624 running: True cancelled: False 1348685953824 running: True cancelled: False 1348686003552 running: True cancelled: False 1348686003888 running: False cancelled: False 1348686004128 running: False cancelled: False Executor Shutdown ----- 1348685908624 running: False cancelled: False 1348685953824 running: False cancelled: False 1348686003552 running: False cancelled: False 1348686003888 running: False cancelled: True 1348686004128 running: False cancelled: True

The runnning of the first three Futures is in the False state. Now that the Future has stopped running, the main process will end immediately.

Summary

It was a tricky method, but I introduced how to terminate the process immediately with ProcessPoolExecutor. I don't think I will use it much, but I think it may be a problem in rare cases, so please refer to it.

Digression

The internal processing of ProcessPoolExecutor is in ~ \ lib \ concurrent \ futures \ process.py. The explanation is written at the beginning with the figure, which is very easy to understand, so I recommend you to read it. ProcessPoolExecutor.png

Recommended Posts

How to kill a process instantly with Python's Process Pool Executor
Node.js: How to kill offspring of a process started with child_process.fork ()
How to print characters as a table with Python's print function
How to add a package with PyCharm
How to read a CSV file with Python 2/3
How to send a message to LINE with curl
How to draw a 2-axis graph with pyplot
How to develop a cart app with Django
How to make a dictionary with a hierarchical structure.
How to read an array with Python's ConfigParser
How to create a multi-platform app with kivy
How to process camera images with Teams or Zoom
How to convert / restore a string with [] in python
[Python] How to draw a line graph with Matplotlib
How to create a submenu with the [Blender] plugin
Try to dynamically create a Checkbutton with Python's Tkinter
How to get a logged-in user with Django's forms.py
How to convert a class object to a dictionary with SQLAlchemy
How to make a shooting game with toio (Part 1)
[Python] How to create a 2D histogram with Matplotlib
[Python] How to draw a scatter plot with Matplotlib
How to use python multiprocessing (continued 3) apply_async in class with Pool as a member
How to deploy a web app made with Flask to Heroku
How to convert an array to a dictionary with Python [Application]
How to output a document in pdf format with Sphinx
Kill the process with sudo kill -9
How to call a function
How to create a flow mesh around a cylinder with snappyHexMesh
How to update with SQLAlchemy?
How to make a simple Flappy Bird game with pygame
How to cast with Theano
How to hack a terminal
How to display a list of installable versions with pyenv
How to extract other than a specific index with Numpy
A story about how to deal with the CORS problem
How to build a python2.7 series development environment with Vagrant
How to Alter with SQLAlchemy?
How to separate strings with','
How to RDP with Fedora31
How to Delete with SQLAlchemy?
How to insert a specific process at the start and end of spider with scrapy
How to output additional information when logging with python's logging module
How to set a shortcut to switch full-width and half-width with IBus
I want to terminate python's multiprocessing Pool with ctrl + c (KeyboardInterrupt)
[Introduction to Python] How to split a character string with the split function
[Python 3.8 ~] How to define a recursive function smartly with a lambda expression
How to make a command to read the configuration file with pyramid
How to make a surveillance camera (Security Camera) with Opencv and Python
How to customize U-Boot with OSD335X on a custom board (memo)
How to create a heatmap with an arbitrary domain in Python
Here's a brief summary of how to get started with Django
How to create a label (mask) for segmentation with labelme (semantic segmentation mask)
[C language] How to create, avoid, and make a zombie process
I tried to make a periodical process with Selenium and Python
[Python] How to get a value with a key other than value with Enum
How to write a docstring to create a named tuple document with sphinx
[ROS2] How to play a bag file with python format launch
How to send a request to the DMM (FANZA) API with python
How to create a serverless machine learning API with AWS Lambda
How to cancel RT with tweepy
How to write a Python class