Effectuer le traitement à l'aide de Celery, qui effectue un traitement de TaskQueue distribué J'ai décrit l'échantillon.
Installation de céleri
pip install celery
Dans le cas des fenêtres, céleri 4 ou version ultérieure n'est pas pris en charge, donc Spécifie la dernière version de Windows prise en charge.
Installation de céleri (fenêtres)
pip install celery==3.1.25
Utilisez la méthode de travail pour exécuter réellement le processus.
tasks.py
from celery import Celery
app = Celery('tasks', result='rpc://', broker='amqp://[email protected]//')
@app.task
def add(x, y):
return x, y
Commencez ceci en tant que travailleur. Utilisez Rabbit MQ démarré à 192.168.0.3. Spécifiez rpc: // où le résultat sera enregistré (il semble que ce soit le backend du résultat). S'il s'agit d'une production, il semble que Redis etc. sera la destination de stockage.
$ celery -A tasks worker --loglevel=info
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@DESKTOP-GJOIME5 v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.14393-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x22d0e56d080
- ** ---------- .> transport: amqp://guest:**@192.168.0.3:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2017-06-19 06:45:10,040: INFO/MainProcess] Connected to amqp://guest:**@192.168.0.3:5672//
[2017-06-19 06:45:10,118: INFO/MainProcess] mingle: searching for neighbors
[2017-06-19 06:45:11,262: INFO/MainProcess] mingle: all alone
[2017-06-19 06:45:11,332: WARNING/MainProcess] celery@DESKTOP-GJOIME5 ready.
Il a démarré en toute sécurité.
Code de l'appelant
>>> from tasks import add
>>> async_result = add.delay(1,2)
>>> async_result
<AsyncResult: 69bf0ccf-6e74-46e0-ae5a-1fb566bb0657>
#Il est lié au résultat stocké dans Redis etc. en utilisant uuid de AsyncResult???
>>> async_result.ready()
True
>>> async_result.result
3
Les tâches peuvent être mises en file d'attente en appelant avec la méthode delay. Le résultat peut être obtenu à partir de result après que le résultat de result.ready () devienne True.
Comportement lors du lancement d'une tâche dans un worker qui a déjà été démarré. Autant que je puisse voir, il semble que la tâche spécifiée puisse être exécutée en toute sécurité.
Comportement du côté des travailleurs
[2017-06-19 06:56:23,934: INFO/MainProcess] Received task: tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb]
[2017-06-19 06:56:23,934: INFO/MainProcess] Task tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb] succeeded in
0s: 3
Pour enregistrer le résultat de l'exécution de la tâche, spécifiez le backend lors de la création d'une instance de Celery. Cette fois, rpc: // est spécifié, mais il semble recommandé de le stocker dans Redis etc. en production. (https://blog.ozacc.com/docs/celery/getting-started/first-steps-with-celery.html#keeping-results)
La mise en file d'attente des tâches avec Celery s'est avérée assez facile. Pour un mécanisme de traitement des tâches simple, utilisez Céleri + RabbitMQ + destination d'enregistrement des résultats. Il semble qu'il puisse être créé rapidement.
Recommended Posts