[Python] I examined the practice of asynchronous processing that can be executed in parallel with the main thread (multiprocessing, asyncio).

I considered a practice of python asynchronous processing that meets the following requirements.

--I want to execute main processing and asynchronous processing in parallel. --I want to wait until the execution of asynchronous processing is completed at the specified location in the main processing. --I want to get the return value of asynchronous processing with future.

The code is a bit verbose, but I've included all the sources, including loggers, so you can copy and paste it as is.

Case 1 Process main thread and asynchronous thread in parallel (using multiprocessing)

multiprocessing1.py


from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging

#Get logger
def get_logger():
    logger = logging.getLogger("multiprocesssing_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

def async_func(name, sleep_time):
    #Get thread id
    thread_id = th.get_ident()

    logger.info(f"thread_id:{thread_id} name:{name} async_start func")
    time.sleep(sleep_time)
    logger.info(f"thread_id:{thread_id} name:{name} async_end of func")

    return f"{thread_id}-{name}"

if __name__ == "__main__":
    #Create a thread pool for thread execution
    #Maximum simultaneous threads in processes
    pool = ThreadPool(processes=1)

    #Get thread id
    thread_id = th.get_ident()

    #Execute asynchronous processing. Specify the function object as the first argument and the argument as the second argument.
    logger.info(f"thread_id:{thread_id}Call asynchronous processing from main")
    future = pool.apply_async(async_func, ("Thread 1", 10))

    #Processing that you want to execute in the main thread in parallel with asynchronous processing
    logger.info(f"thread_id:{thread_id}main Start processing during asynchronous processing")
    time.sleep(5)
    logger.info(f"thread_id:{thread_id}main End of processing during asynchronous processing")

    #Wait for the asynchronous process to finish and get the result.
    result = future.get()
    logger.info(f"thread_id:{thread_id}Get the result of asynchronous processing:{result}")
    pool.close()

Execution result

2020-10-15 16:43:27,073 - thread_id:18440 Call asynchronous processing from main
2020-10-15 16:43:27,074 - thread_id:18440 main Start processing during asynchronous processing
2020-10-15 16:43:27,074 - thread_id:18132 name:Thread 1 async_start func
2020-10-15 16:43:32,074 - thread_id:18440 main End of processing during asynchronous processing
2020-10-15 16:43:37,075 - thread_id:18132 name:Thread 1 async_end of func
2020-10-15 16:43:37,075 - thread_id:18440 Get the result of asynchronous processing:18132-Thread 1

From the log, you can see that at 16:43:27, the "main asynchronous processing in progress" and "async_func start" processes are being executed in parallel at the same time.

Case 2 Processing main thread and multiple asynchronous threads in parallel (using multiprocessing)

multiprocessing2.py


from multiprocessing.pool import ThreadPool
import time
import threading as th
import logging

#Get logger
def get_logger():
    logger = logging.getLogger("multiprocesssing_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

def async_func(name, sleep_time):
    #Get thread id
    thread_id = th.get_ident()

    logger.info(f"thread_id:{thread_id} name:{name} async_start func")
    time.sleep(sleep_time)
    logger.info(f"thread_id:{thread_id} name:{name} async_end of func")

    return f"{thread_id}-{name}"

if __name__ == "__main__":
    #Create a thread pool for thread execution
    #Maximum simultaneous threads in processes
    pool = ThreadPool(processes=5)

    #Get thread id
    thread_id = th.get_ident()

    #Execute asynchronous processing. Specify the function object as the first argument and the argument as the second argument.
    logger.info(f"thread_id:{thread_id}Call asynchronous processing from main")
    futures = []
    for i in range(5):
        future = pool.apply_async(async_func, (f"thread{i + 1}", 10)) # Tuple of args for foo
        futures.append(future)

    #Processing that you want to execute in the main thread in parallel with asynchronous processing
    logger.info(f"thread_id:{thread_id}main Start processing during asynchronous processing")
    time.sleep(5)
    logger.info(f"thread_id:{thread_id}main End of processing during asynchronous processing")

    #Wait for the asynchronous process to finish and get the result.
    results = [future.get() for future in futures]
    logger.info(f"thread_id:{thread_id}Get the result of asynchronous processing:{results}")
    pool.close()

Execution result

2020-10-15 16:47:41,977 - thread_id:13448 Call asynchronous processing from main
2020-10-15 16:47:41,978 - thread_id:13448 main Start processing during asynchronous processing
2020-10-15 16:47:41,979 - thread_id:23216 name:Thread 1 async_start func
2020-10-15 16:47:41,979 - thread_id:21744 name:Thread 2 async_start func
2020-10-15 16:47:41,979 - thread_id:21708 name:Thread 3 async_start func
2020-10-15 16:47:41,979 - thread_id:21860 name:Thread 4 async_start func
2020-10-15 16:47:41,979 - thread_id:22100 name:Thread 5 async_start func
2020-10-15 16:47:46,980 - thread_id:13448 main End of processing during asynchronous processing
2020-10-15 16:47:51,982 - thread_id:21744 name:Thread 2 async_end of func
2020-10-15 16:47:51,982 - thread_id:23216 name:Thread 1 async_end of func
2020-10-15 16:47:51,983 - thread_id:21708 name:Thread 3 async_end of func
2020-10-15 16:47:51,984 - thread_id:21860 name:Thread 4 async_end of func
2020-10-15 16:47:51,984 - thread_id:22100 name:Thread 5 async_end of func
2020-10-15 16:47:51,986 - thread_id:13448 Get the result of asynchronous processing:['23216-Thread 1', '21744-Thread 2', '21708-Thread 3', '21860-Su
Red 4', '22100-Thread 5']

From the log, you can see that at 16:47:41, five processes, "main asynchronous process being executed" and "async_func start", are being executed in parallel at the same time. Also, if you reduce the number of processes by using ThreadPool (processes = 3) etc., 3 threads will be executed first, 2 will be in the waiting state, and a new thread will be executed when completed.

Case 3 Process main thread and multiple asynchronous threads in parallel (using asyncio)

asyncio1.py


import asyncio
import itertools
import time
import profile
import random
import time
import threading as th
import logging

#Get logger
def get_logger():
    logger = logging.getLogger("asyncio_test")
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch_formatter = logging.Formatter('%(asctime)s - %(message)s')
    ch.setFormatter(ch_formatter)
    
    logger.addHandler(ch)
    return logger

logger = get_logger()

#Get something like task id
#* Since asyncio uses a generator internally
#The thread ID will be the same, and the acquisition method of the ID corresponding to the asynchronous processing will be as follows.
_next_id = itertools.count().__next__
def get_task_id():
    return _next_id()

async def async_func(name, sleep_time):
    #Get task id
    task_id = get_task_id()

    logger.info(f"task_id:{task_id} name:{name} async_start func")
    await asyncio.sleep(sleep_time)
    logger.info(f"task_id:{task_id} name:{name} async_end of func")

    return f"{task_id}-{name}"

async def async_func_caller():
    #Get task id
    task_id = get_task_id()

    #Generate asynchronous processing task
    #* At this point, the task is only generated and not executed.
    # loop.run_until_Executed when calling complete.
    futures = [asyncio.ensure_future(async_func(f"task{i + 1}", 10)) for i in range(5)]

    #Processing that you want to execute in the main thread in parallel with asynchronous processing
    logger.info(f"task_id:{task_id} async_func_caller Start processing during asynchronous processing execution")
    await asyncio.sleep(5)
    logger.info(f"task_id:{task_id} async_func_caller End of processing during asynchronous processing execution")

    #Wait for the asynchronous process to finish and get the result.
    results = await asyncio.gather(*futures)

    return results


if __name__ == "__main__":
    #Create a thread pool for asynchronous processing execution
    loop = asyncio.get_event_loop()

    logger.info(f"main Start processing during asynchronous processing")

    #Execute asynchronous processing and wait until the end
    ret = loop.run_until_complete(async_func_caller())
    logger.info(f"main End of processing during asynchronous processing Result:{ret}")
    loop.close()

Execution result

2020-10-15 16:49:40,132 -main Start processing during asynchronous processing
2020-10-15 16:49:40,134 - task_id:0 async_func_caller Start processing during asynchronous processing execution
2020-10-15 16:49:40,134 - task_id:1 name:task1 async_start func
2020-10-15 16:49:40,135 - task_id:2 name:task2 async_start func
2020-10-15 16:49:40,135 - task_id:3 name:task3 async_start func
2020-10-15 16:49:40,136 - task_id:4 name:task4 async_start func
2020-10-15 16:49:40,136 - task_id:5 name:task5 async_start func
2020-10-15 16:49:45,138 - task_id:0 async_func_caller End of processing during asynchronous processing execution
2020-10-15 16:49:50,141 - task_id:2 name:task2 async_end of func
2020-10-15 16:49:50,142 - task_id:5 name:task5 async_end of func
2020-10-15 16:49:50,142 - task_id:4 name:task4 async_end of func
2020-10-15 16:49:50,144 - task_id:1 name:task1 async_end of func
2020-10-15 16:49:50,144 - task_id:3 name:task3 async_end of func
2020-10-15 16:49:50,145 -main End of processing during asynchronous processing Result:['1-task1', '2-task2', '3-task3', '4-task4', '5-task5']

From the log, you can see that at 16:49:40, five processes, "main asynchronous process being executed" and "async_func start", are being executed in parallel at the same time.

We hope this will be helpful when implementing asynchronous processing.

Recommended Posts

[Python] I examined the practice of asynchronous processing that can be executed in parallel with the main thread (multiprocessing, asyncio).
I bought and analyzed the year-end jumbo lottery with Python that can be executed in Colaboratory
Receive a list of the results of parallel processing in Python with starmap
I made a familiar function that can be used in statistics with Python
The story that sendmail that can be executed in the terminal did not work with cron
Processing of python3 that seems to be usable in paiza
Goroutine (parallel control) that can be used in the field
I investigated the pretreatment that can be done with PyCaret
A function that measures the processing time of a method in python
I made a shuffle that can be reset (reverted) with Python
[Python] A program that finds the maximum number of toys that can be purchased with your money
Article that can be a human resource who understands and masters the mechanism of API (with Python code)
Python practice data analysis Summary of learning that I hit about 10 with 100 knocks
[Python] The movement of the decorator that can be understood this time ② The decorator that receives the argument
I installed Pygame with Python 3.5.1 in the environment of pyenv on OS X
I want to create a priority queue that can be updated in Python (2.7)
I registered PyQCheck, a library that can perform QuickCheck with Python, in PyPI.
I tried to predict the horses that will be in the top 3 with LightGBM
Asynchronous processing in Python: asyncio reverse lookup reference
View the result of geometry processing in Python
Parallel processing with no deep meaning in Python
List of tools that can be used to easily try sentiment analysis of Japanese sentences in Python (try with google colab)
Summary of statistical data analysis methods using Python that can be used in business
Geographic information visualization of R and Python that can be expressed in Power BI
Here is one of the apps with "artificial intelligence" that I was interested in.
[Python] Introduction to web scraping | Summary of methods that can be used with webdriver
In Python3.8 and later, the inverse mod can be calculated with the built-in function pow.
I tried to compare the processing speed with dplyr of R and pandas of Python
Python knowledge notes that can be used with AtCoder
A memo that I touched the Datastore with python
Can be used with AtCoder! A collection of techniques for drawing short code in Python!
I wrote a doctest in "I tried to simulate the probability of a bingo game with Python"
[Python] A program to find the number of apples and oranges that can be harvested
Parallel processing with multiprocessing
Format summary of formats that can be serialized with gensim
I tried to find the entropy of the image with python
Try scraping the data of COVID-19 in Tokyo with Python
I tried "gamma correction" of the image with Python + OpenCV
I tried the accuracy of three Stirling's approximations in python
Why can I use the module by importing with python?
Calculate the square root of 2 in millions of digits with python
I wrote the basic grammar of Python with Jupyter Lab
I evaluated the strategy of stock system trading with Python.
I measured various methods of interprocess communication in multiprocessing of python3
[Homology] Count the number of holes in data with Python
Goroutine that can be used in the field (errgroup.Group edition)
Scripts that can be used when using bottle in Python
Evaluation index that can be specified in GridSearchCV of sklearn
I tried to expand the database so that it can be used with PES analysis software
Understand the probabilities and statistics that can be used for progress management with a python program
Python Priority Queuing The mystery of the result of the heapify function that most people wouldn't be interested in
A story that didn't work when I tried to log in with the Python requests module
Predict the number of cushions that can be received as laughter respondents with Word2Vec + Random Forest
The story of creating a bot that displays active members in a specific channel of slack with python
I tried to find out as much as possible about the GIL that you should know if you are doing parallel processing with Python