Exemple d'implémentation de file d'attente de travaux utilisant le collout de Tornado

C'est un mémo quand j'ai fait une file d'attente avec le collout de Tornado.

Les travaux de file d'attente (fonctions et arguments) avec la méthode de mise de WorkerQueue. Dès que le travail en cours d'exécution est terminé, les travaux de la file d'attente seront exécutés un par un, en commençant par le plus ancien.

J'utilise la file d'attente implémentée dans la version 4.2 de Tornado. Les files d'attente Tornado sont similaires à la file d'attente de synchronisation de la bibliothèque standard Python (queue.Queue), sauf que put et get return tornado.concurrent.Future.

filename


from concurrent.futures import ProcessPoolExecutor
import time

from tornado import ioloop, gen, process
from tornado.queues import Queue


class WorkerQueue(object):
    def __init__(self):
        self.queue = Queue()
        self.current_worker_id = None
        self.current_worker = None
        self.queued_ids = []
        self._dispatcher()

    def put(self, id_, func, args):
        worker = Worker(func, args)
        self.queued_ids.append(id_)
        self.queue.put_nowait((id_, worker))
        print("Put: {}".format(id_))

    def status(self, id_):
        if id_ in self.queued_ids:
            return "Queued"
        elif id_ == self.current_worker_id:
            return "Running"
        else:
            return "Ready"

    @gen.coroutine
    def _dispatcher(self):
        while 1:
            id_, worker = yield self.queue.get()
            self.queued_ids.remove(id_)
            self.current_worker_id = id_
            self.current_worker = worker
            print("Start: {}".format(id_))
            res = yield self.current_worker.execute()
            self.current_worker_id = None
            self.current_worker = None
            print("{} is {}.".format(id_, res))


class Worker(object):
    def __init__(self, func, args):
        self.func = func
        self.args = args

    @gen.coroutine
    def execute(self):
        with ProcessPoolExecutor(process.cpu_count()) as exec_:
            res = yield exec_.submit(self.func, *self.args)
        return res


def job(sec):
    time.sleep(sec)
    return "done"


@gen.coroutine
def run():
    q = WorkerQueue()
    q.put("Job1", job, (2,))
    q.put("Job2", job, (2,))
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(1)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(2)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(2)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))


if __name__ == "__main__":
    ioloop.IOLoop.current().run_sync(run)

Résultat d'exécution

Put: Job1
Put: Job2
Job1 <Queued>
Job2 <Queued>
Start: Job1
Job1 <Running>
Job2 <Queued>
Job1 is done
Start: Job2
Job1 <Ready>
Job2 <Running>
Job2 is done
Job1 <Ready>
Job2 <Ready>

(Ajouté le 07.11.2015) Lorsqu'il s'agit d'une grande quantité de données, l'utilisation de la mémoire devient un problème, il est donc nécessaire de prendre des mesures telles que limiter la longueur de la file d'attente et faire attendre le producteur.

Recommended Posts

Exemple d'implémentation de file d'attente de travaux utilisant le collout de Tornado
Traitement asynchrone à l'aide de Linebot dans la file d'attente des travaux
Exemple d'utilisation de lambda
Détection d'anomalies par encodeur automatique à l'aide de keras [Exemple d'implémentation pour les débutants]