Un autre mémo personnel.
Puisqu'il ressemble à ↓, je l'ai écrit parce que je voulais séparer le processus en fonction de la source de la requête et du type de message en attente de réception.
main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import procs
workers = {}
source = procs.Source()
try:
while True:
#Sondage de la source
data = source.poll()
key = data['key']
#Généré s'il n'y a pas de Worker correspondant à la clé
if key not in workers:
workers[key] = procs.Worker()
#Déléguer le traitement au travailleur
workers[key].delegate(data)
finally:
#Mettre fin à Worker lorsque vous avez terminé avec l'interruption du clavier, etc.
source.terminate()
[ w.terminate() for _,w in workers ]
procs.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import abc
import multiprocessing
class Proc(metaclass=abc.ABCMeta):
def __init__(self):
self._stop = multiprocessing.Event()
self._queue = multiprocessing.SimpleQueue()
self._process = multiprocessing.Process(target=self._run)
self._process.start()
def terminate(self):
self._stop.set()
self._process.join()
self._process.terminate()
def _run(self):
while not self._stop.is_set():
self._do()
@abc.abstractmethod
def _do(self, **kwargs):
pass
class Source(Proc):
def _do(self, **kwargs):
#Recevoir des messages
data = { 'key' : 'some-data' }
self._queue.put(data)
def poll(self):
return self._queue.get()
class Worker(Proc):
def _do(self, **kwargs):
data = self._queue.get()
#Traiter les données reçues
def delegate(self, data):
return self._queue.put(data)
Recommended Posts