J'étudie depuis hier, mais je sens que je me suis enfin rapproché de la forme que je veux utiliser, alors je vais faire une note ici à nouveau.
--Je souhaite exécuter le traitement pour une entrée asynchrone sans le bloquer dans un processus séparé. ――Je veux bien attraper la fin
Je voulais m'en rendre compte. J'ai laissé un mot hier, mais j'ai l'impression de m'être installé ici.
Comme spécification
Alors
apply_async
.
--La fonction de rappel définie lors de l'application_async est également définie comme méthode de la classe afin que la file d'attente qui est membre de la classe soit accessible.
-En interne (je ne sais pas si c'est bon), il est géré en utilisant job_id. Implémentez de sorte que le numéro soit unique pour la demande de traitement d'entrée.Je l'ai écrit comme ça.
from multiprocessing import Pool
from time import sleep
from os import getpid, getppid, cpu_count
from datetime import datetime, time, date, timedelta
import sys, json
def _data_handler(o):
""" json.par défaut avec vidage=_data_Utiliser comme gestionnaire"""
if isinstance(o, (datetime, date) ):
return o.strftime('%Y/%m/%d %H:%M:%S')
elif hasattr(o, "isoformat"):
return o.isoformat()
else:
return str(o)
class JobError(Exception):
def __init__(self, job_id:int, pid:int, msg:str, error):
self.job_id = job_id
self.pid = pid
self.msg = msg
self.error = error
def f(*args, **kwargs):
"""Dormez pendant la durée spécifiée"""
try:
print("[{}---{}] f(args {} kwargs={})".format(getpid(), getppid(), args, kwargs))
t = kwargs["params"]["sleep_time"]
if t == 0:
raise Exception("Exception!! sleep time = 0")
sleep(t)
return {"f_answer": 0.0, "pid": getpid(), "job_id": kwargs["job_id"]}
except Exception as e:
raise JobError(kwargs["job_id"], getpid(), "Exception in except in f", e)
class JobController(object):
def __init__(self, num_cpu:int=0):
"""
Spécifiez le nombre de cœurs à utiliser. Si 0, le noyau géré par le système d'exploitation
"""
print("main pid={} cpu_count={}".format(getpid(), cpu_count()))
num_cpu = cpu_count()
self._pool = Pool(num_cpu)
self._working_jobs = {} # job_Laissez id être la clé. Dans le processus de
self._closed_jobs = {} # job_Laissez id être la clé. Fin
self._new_job_id = 0 #Travail à utiliser pour le prochain travail soumis_id
def __del__(self):
pass #Que faire pour éviter de laisser des zombies (pas encore connu).
def my_cb(self, *args):
"""Déplacer le résultat d'un Job terminé avec succès vers la mémoire tampon"""
print("callback args={} jobid={}".format(args, args[0]["job_id"]) )
try:
jobid = args[0]["job_id"]
self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
self._closed_jobs[jobid]["end_time"] = datetime.now()
self._closed_jobs[jobid]["successful"] = True
self._closed_jobs[jobid]["return"] = args
except:
pass
if len(self._closed_jobs) == 0:
self._pool.join()
def my_err_cb(self, args):
"""Déplacez le résultat du travail qui s'est terminé anormalement dans la mémoire tampon. args est JobError"""
print("error callback args={} job_id={}".format(args, args.job_id) )
try:
jobid = args.job_id
self._closed_jobs[jobid] = self._working_jobs.pop(jobid, {})
self._closed_jobs[jobid]["end_time"] = datetime.now()
self._closed_jobs[jobid]["successful"] = False
self._closed_jobs[jobid]["return"] = args
except:
pass
if len(self._closed_jobs) == 0:
self._pool.join()
def PushJob(self, params:dict):
"""Supprimez le travail. L'argument est donné ici sous forme de données de type dictionnaire. """
print("PushJob ", getpid(), getppid())
res = self._pool.apply_async(f, args=(1,), kwds={"params":params, "job_id":self._new_job_id},
callback=self.my_cb, error_callback=self.my_err_cb)
self._working_jobs[self._new_job_id] = {"start_time": datetime.now(), "async_res": res}
self._new_job_id += 1
def GetCurrentWorkingJobCount(self):
"""Nombre de travaux en cours (lancés mais non terminés)"""
return len(self._working_jobs)
def GetCurrentClosedJobCount(self):
"""Nombre de travaux terminés"""
return len(self._closed_jobs)
if __name__ == "__main__":
try:
print("main pid = {} ppid={}".format(getpid(), getppid()))
job_controller = JobController(0)
# 0.Soumettre des travaux toutes les 5 secondes.
for i in range(10):
params = {"values": random.randn(3), "sleep_time": i % 7}
job_controller.PushJob(params)
sleep(0.5)
#L'état est affiché jusqu'à ce qu'il n'y ait plus de Jobs en cours.
while True:
print("working_jobs {}:", job_controller.GetCurrentWorkingJobCount())
print(json.dumps(job_controller._working_jobs, indent=2, default=_data_handler))
print("closed_jobs {}:", job_controller.GetCurrentClosedJobCount())
print(json.dumps(job_controller._closed_jobs, indent=2, default=_data_handler))
if job_controller.GetCurrentWorkingJobCount() == 0:
break
sleep(3)
except:
pass
Pour le moment, cela fonctionne comme prévu. La terminaison normale et la terminaison anormale sont empilées dans self._closd_jobs, et finalement _working_jobs devient 0. Les heures de début et de fin ont également été enregistrées correctement.
C'était comme ça.
Les problèmes sont les suivants.
Cependant, je me demande si je peux survivre au travail pour le moment. .. .. Je suis satisfait pour le moment et je termine. (2020/04/19, 18:17)
――En fait, si vous essayez de l'utiliser dans votre propre classe qui hérite de Thread,
NotImplementedError: pool objects cannot be passed between processes or pickled
J'ai eu une erreur comme celle-ci. Je l'ai résolu, mais il semblait que je mettais mon propre type de données (message GCP PubSub) directement dans ** kwargs (données de type dictionnaire). Je ne sais pas si Thread est impliqué. (2020/04/20)
--Lorsque j'essaye à nouveau de Pool apply_async dans la fonction apply_async, j'obtiens ```AssertionError: les processus démoniaques ne sont pas autorisés à avoir des enfants '' Le défi est de faire face à cela. Larmes (2020/05/13)
Recommended Posts