Perform processing using Celery that performs distributed TaskQueue processing I described the sample.
celery installation
pip install celery
In case of windows, celery 4 or later is not supported, so Specifies the last version that windows is supported.
Installation of celery (windows)
pip install celery==3.1.25
Use the worker method to actually perform the process.
tasks.py
from celery import Celery
app = Celery('tasks', result='rpc://', broker='amqp://[email protected]//')
@app.task
def add(x, y):
return x, y
Start this as a worker. Use RabbitMQ started at 192.168.0.3. Specify rpc: // where the result will be saved (it seems to be the result backend). It seems that Redis etc. will be the storage destination if it is production.
$ celery -A tasks worker --loglevel=info
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@DESKTOP-GJOIME5 v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.14393-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x22d0e56d080
- ** ---------- .> transport: amqp://guest:**@192.168.0.3:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2017-06-19 06:45:10,040: INFO/MainProcess] Connected to amqp://guest:**@192.168.0.3:5672//
[2017-06-19 06:45:10,118: INFO/MainProcess] mingle: searching for neighbors
[2017-06-19 06:45:11,262: INFO/MainProcess] mingle: all alone
[2017-06-19 06:45:11,332: WARNING/MainProcess] celery@DESKTOP-GJOIME5 ready.
It started up safely.
Caller code
>>> from tasks import add
>>> async_result = add.delay(1,2)
>>> async_result
<AsyncResult: 69bf0ccf-6e74-46e0-ae5a-1fb566bb0657>
#It is linked with the result stored in Redis etc. using uuid of AsyncResult???
>>> async_result.ready()
True
>>> async_result.result
3
Tasks can be queued by calling with the delay method. The result can be obtained from result after the result of result.ready () becomes True.
Behavior when throwing a task into a worker that has already been started. As far as I can see, it seems that the specified task can be executed safely.
Behavior on the worker side
[2017-06-19 06:56:23,934: INFO/MainProcess] Received task: tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb]
[2017-06-19 06:56:23,934: INFO/MainProcess] Task tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb] succeeded in
0s: 3
To save the task execution result, specify the backend when creating an instance of Celery. This time, rpc: // is specified, but it seems recommended to store it in Redis etc. in production operation. (https://blog.ozacc.com/docs/celery/getting-started/first-steps-with-celery.html#keeping-results)
Task queuing with Celery turned out to be fairly easy. For a simple task processing mechanism, use Celery + RabbitMQ + result save destination. It seems that it can be created quickly.
-Celery Official -Celery Official Document
Recommended Posts