Another personal memo.
Since it looks like ↓, I wrote it because I wanted to separate the process according to the request source and the type of message when waiting for reception.
main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import procs
workers = {}
source = procs.Source()
try:
while True:
#Poll from source
data = source.poll()
key = data['key']
#Generated if there is no Worker corresponding to key
if key not in workers:
workers[key] = procs.Worker()
#Delegate processing to worker
workers[key].delegate(data)
finally:
#Terminate Worker when finished with Keyboard Interrupt etc.
source.terminate()
[ w.terminate() for _,w in workers ]
procs.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import abc
import multiprocessing
class Proc(metaclass=abc.ABCMeta):
def __init__(self):
self._stop = multiprocessing.Event()
self._queue = multiprocessing.SimpleQueue()
self._process = multiprocessing.Process(target=self._run)
self._process.start()
def terminate(self):
self._stop.set()
self._process.join()
self._process.terminate()
def _run(self):
while not self._stop.is_set():
self._do()
@abc.abstractmethod
def _do(self, **kwargs):
pass
class Source(Proc):
def _do(self, **kwargs):
#Receiving messages
data = { 'key' : 'some-data' }
self._queue.put(data)
def poll(self):
return self._queue.get()
class Worker(Proc):
def _do(self, **kwargs):
data = self._queue.get()
#Process the received data
def delegate(self, data):
return self._queue.put(data)
Recommended Posts