What's happening while reading the Celery docs Official docs and chewing yourself.
The primitives
Map & Starmap, Chunks have not been examined.
Chains
Execute tasks in series. Subsequent tasks receive the execution result of the previous task.
** Pay attention to the signature of each task. ** **
from celery import chain
# `add.s(4, 4)`The result of`mul.s(8)`Cross over to.そThe result of`mul.s(10)`Cross over to.
chain(add.s(4, 4), mul.s(8), mul.s(10))
Groups
Execute multiple tasks in parallel.
from celery import group
# `task.s(2, 2)`When`task.s(4, 4)`Is executed in parallel
group(
task.s(2, 2),
task.s(4, 4)
)
Chords
Execution results of multiple tasks can be passed to the callback.
from celery import chord
#Tsum multiple execution results.s()Pass to
chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()
In this example, ʻadd.s (i, i) for i in xrange (100)is a task to be executed in parallel, and the execution result (list?) Is passed to the callback
tsum.s ()`.
1 pre-processor + n post-processors
I wanted to do this and looked it up!
It looks like this as a time scale.
Assuming tasks such as image saving as pre-processing, image processing as post-processing, resizing, etc. that can be run at the same time.
@task
def main_task(object_id):
#Pre-processing tasks
#Nice processing
#Object for subsequent tasks_Returns id
return object_id
@task
def sub_task_1(object_id):
#Post-processing tasks
pass
@task
def sub_task_2(object_id):
#Post-processing tasks
pass
#Build a chain of entire tasks. chain()And group.
chains = chain(
main_task.s(object.pk),
group(
sub_task_1.s(),
sub_task_2.s()
)
)
#Run the chain
# main_After the execution of task is completed, sub_task_1, sub_task_2 is executed in parallel.
chains.apply_async()
The point is to match the signatures received by subsequent tasks in the group.
1 processor + mapped post processors
Perform multiple outputs from the pre-processing and run multiple subsequent tasks in parallel. Similar to the previous flow, except that the pre process outputs multiple results (indefinite number).
@task
def pre_process_task(object_id):
#Pre-processing tasks
#Nice processing
#Returns a list of objects that will be the processing result
return [1, 2, 3, 4, 5 ...]
@task
def post_process_task(object_id):
#Post-processing tasks
#Design to receive individual objects
pass
@task
def dmap(it, callback):
#Receive the list and pass it to callbak
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
#Build a chain of entire tasks
chains = chain(
pre_process_task.s(object.pk),
dmap.s(post_process_task.s()
)
#Run the chain
# pre_process_post after task execution is complete_process_task processes in parallel
chains.apply_async()
Completed task
If you use si ()
, it becomes an immutable task, so you can execute the task by ignoring the return value of the previous task.
@task
def main_task(object_id):
#Some task
return (object_id, result)
@task
def sub_task_1(args):
#Some task
object_id, result = args
return True
@task
def sub_task_2(args):
#Some task
object_id, result = args
return True
@task
def finalize_task(object_id):
#Output task completion log
logger.info('Task completed')
return True
object_id = 123
chain(
main_task.s(object_id),
group(
sub_task_1.s(), # main_Use the return value of task
sub_task_2.s() # main_Use the return value of task
),
main_completed_task.si(object_id) # s()Not si()Note that
).apply_async()