I've been studying since yesterday, but I feel like I've finally come close to the shape I want to use, so I'll make a note here again.
--I want to move processing asynchronously without blocking it in a separate process. ――I want to catch the end properly
I wanted to realize that. I left a note yesterday, but it feels like I've settled down here.
-How to use python multiprocessing -How to use python multiprocessing (continued) Pool -How to use python multiprocessing (continued 3) apply_async in class with Pool as a member
As a specification
--The input of the process is called by the Push method that takes a parameter as an argument. This method does not block. --Internally, it has a record of in-process and end processing. --If you need to tell the end, you can implement that too
So
--Implement one class. Only the processing is an individual function. (Image to external file)
--Use multiprocessing.Pool
. Besides this, manage the result of apply_async
.
--The callback function set when applying_async is also set as a method of the class so that the queue that is a member of the class can be accessed.
--(I don't know if this is good) Internally, it is managed using job_id. Implement so that the number is unique to the input processing request.
I wrote it like this.
from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
from datetime import datetime, time, date, timedelta
import sys, json
def _data_handler(o):
""" json.default with dump=_data_Use as a handler"""
if isinstance(o, (datetime, date) ):
return o.strftime('%Y/%m/%d %H:%M:%S')
elif hasattr(o, "isoformat"):
return o.isoformat()
else:
return str(o)
class JobError(Exception):
def __init__(self, job_id:int, pid:int, msg:str, error):
self.job_id = job_id
self.pid = pid
self.msg = msg
self.error = error
def f(*args, **kwargs):
"""Sleep for the specified time"""
try:
print("[{}---{}] f(args {} kwargs={})".format(getpid(), getppid(), args, kwargs))
t = kwargs["params"]["sleep_time"]
if t == 0:
raise Exception("Exception!! sleep time = 0")
sleep(t)
return {"f_answer": 0.0, "pid": getpid(), "job_id": kwargs["job_id"]}
except Exception as e:
raise JobError(kwargs["job_id"], getpid(), "Exception in except in f", e)
class JobController(object):
def __init__(self, num_cpu:int=0):
"""
Specify the number of cores to use. If it is 0, the core managed by the OS
"""
print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
num_cpu = cpu_count()
self._pool = Pool(num_cpu)
self._working_jobs = {} # job_Let id be the key. In the process of
self._closed_jobs = {} # job_Let id be the key. End
self._new_job_id = 0 #Job to use for the next submitted job_id
def __del__(self):
pass #What you should do to avoid leaving zombies (not yet known).
def my_cb(self, *args):
"""Move the result of a successfully completed Job to the buffer"""
print("callback args={} jobid={}".format(args, args[0]["job_id"]) )
try:
jobid = args[0]["job_id"]
self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
self._closed_jobs[jobid]["end_time"] = datetime.now()
self._closed_jobs[jobid]["successful"] = True
self._closed_jobs[jobid]["return"] = args
except:
pass
if len(self._closed_jobs) == 0:
self._pool.join()
def my_err_cb(self, args):
"""Move the result of the job that ended abnormally to the buffer. args is JobError"""
print("error callback args={} job_id={}".format(args, args.job_id) )
try:
jobid = args.job_id
self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
self._closed_jobs[jobid]["end_time"] = datetime.now()
self._closed_jobs[jobid]["successful"] = False
self._closed_jobs[jobid]["return"] = args
except:
pass
if len(self._closed_jobs) == 0:
self._pool.join()
def PushJob(self, params:dict):
"""Drop the Job. Arguments are given here as dictionary type data. """
print("PushJob ", getpid(), getppid())
res = self._pool.apply_async(f, args=(1,), kwds={"params":params, "job_id":self._new_job_id},
callback=self.my_cb, error_callback=self.my_err_cb)
self._working_jobs[self._new_job_id] = {"start_time": datetime.now(), "async_res": res}
self._new_job_id += 1
def GetCurrentWorkingJobCount(self):
"""Number of jobs in process (introduced but not finished)"""
return len(self._working_jobs)
def GetCurrentClosedJobCount(self):
"""Number of completed Jobs"""
return len(self._closed_jobs)
if __name__ == "__main__":
try:
print("main pid = {} ppid={}".format(getpid(), getppid()))
job_controller = JobController(0)
# 0.Submit Jobs every 5 seconds.
for i in range(10):
params = {"values": random.randn(3), "sleep_time": i % 7}
job_controller.PushJob(params)
sleep(0.5)
#The state is output until there are no more Jobs in progress.
while True:
print("working_jobs {}:", job_controller.GetCurrentWorkingJobCount())
print(json.dumps(job_controller._working_jobs, indent=2, default=_data_handler))
print("closed_jobs {}:", job_controller.GetCurrentClosedJobCount())
print(json.dumps(job_controller._closed_jobs, indent=2, default=_data_handler))
if job_controller.GetCurrentWorkingJobCount() == 0:
break
sleep(3)
except:
pass
For the time being, it works as expected. Both normal termination and abnormal termination are piled up in self._closd_jobs, and finally _working_jobs becomes 0. The start and end times were also recorded correctly.
--There seems to be no problem with the transfer of information and the input of processes.
It was like that.
The issues are as follows.
--The function to end the process being worked on is not implemented. ――Is it okay to continue using Pool without doing the work like clearing? Is worrisome. ――Can you change the number of cores used in Pool on the way? ――What happens when you use it with Kubernetes?
However, I wonder if I can survive the work for the time being. .. .. I am satisfied for the time being and finish. (2020/04/19, 18:17)
――Actually, if you try to use it in your own class that inherits Thread,
NotImplementedError: pool objects cannot be passed between processes or pickled
I got an error like this. I solved it, but it seemed that I was putting my own type of data (GCP PubSub message) directly into ** kwargs (dictionary type data). I don't know if Thread is involved. (2020/04/20)
--When I try to Pool apply_async again in the apply_async function, I get AssertionError: daemonic processes are not allowed to have children
. The challenge is how to deal with this. Tears (2020/05/13)
Recommended Posts