En lisant le document sur le céleri, qu'y a-t-il? Document officiel et à mâcher vous-même.
The primitives
Map & Starmap, Chunks n'ont pas été examinés.
Chains
Exécutez les tâches en série. Les tâches suivantes reçoivent le résultat de l'exécution de la tâche précédente.
** Faites attention à la signature de chaque tâche. ** **
from celery import chain
# `add.s(4, 4)`Le résultat de`mul.s(8)`Passez à.そLe résultat de`mul.s(10)`Passez à.
chain(add.s(4, 4), mul.s(8), mul.s(10))
Groups
Exécutez plusieurs tâches en parallèle.
from celery import group
# `task.s(2, 2)`Quand`task.s(4, 4)`Est exécuté en parallèle
group(
task.s(2, 2),
task.s(4, 4)
)
Chords
Les résultats de l'exécution de plusieurs tâches peuvent être transmis au rappel.
from celery import chord
#Résultats d'exécution multiples Tsum.s()Passer au
chord(add.s(i, i) for i in xrange(100))(tsum.s()).get()
Dans cet exemple, ʻadd.s (i, i) pour i dans xrange (100) est une tâche à exécuter en parallèle, et le résultat de l'exécution (liste?) Est passé au rappel
tsum.s () `.
1 pre-processor + n post-processors
Je voulais faire ça et j'ai cherché!
Cela ressemble à une échelle de temps.
Assumer des tâches telles que l'exécution de la sauvegarde d'image comme prétraitement, le traitement d'image comme post-traitement, le redimensionnement, etc. en même temps.
@task
def main_task(object_id):
#Tâches de prétraitement
#Beau traitement
#Objet pour les tâches suivantes_Renvoie l'ID
return object_id
@task
def sub_task_1(object_id):
#Tâches de post-traitement
pass
@task
def sub_task_2(object_id):
#Tâches de post-traitement
pass
#Construisez une chaîne de tâches entières. chaîne()Et groupe.
chains = chain(
main_task.s(object.pk),
group(
sub_task_1.s(),
sub_task_2.s()
)
)
#Exécutez la chaîne
# main_Une fois l'exécution de la tâche terminée, sous_task_1, sub_task_2 est exécuté en parallèle.
chains.apply_async()
Le but est de faire correspondre les signatures reçues par les tâches suivantes dans le groupe.
1 processor + mapped post processors
Exécutez plusieurs sorties du prétraitement et exécutez plusieurs tâches suivantes en parallèle. Similaire au flux précédent, sauf que le pré-processus génère plusieurs résultats (nombre indéfini).
@task
def pre_process_task(object_id):
#Tâches de prétraitement
#Beau traitement
#Renvoie une liste d'objets qui seront traités
return [1, 2, 3, 4, 5 ...]
@task
def post_process_task(object_id):
#Tâches de post-traitement
#Conception pour recevoir des objets individuels
pass
@task
def dmap(it, callback):
#Recevez la liste et transmettez-la à callbak
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
#Construisez une chaîne de tâches entières
chains = chain(
pre_process_task.s(object.pk),
dmap.s(post_process_task.s()
)
#Exécutez la chaîne
# pre_process_publier une fois l'exécution de la tâche terminée_process_processus de tâches en parallèle
chains.apply_async()
Completed task
Si vous utilisez si ()
, cela devient une tâche immuable, vous pouvez donc exécuter la tâche en ignorant la valeur de retour de la tâche précédente.
@task
def main_task(object_id):
#Une tâche
return (object_id, result)
@task
def sub_task_1(args):
#Une tâche
object_id, result = args
return True
@task
def sub_task_2(args):
#Une tâche
object_id, result = args
return True
@task
def finalize_task(object_id):
#Journal d'achèvement de la tâche de sortie
logger.info('Task completed')
return True
object_id = 123
chain(
main_task.s(object_id),
group(
sub_task_1.s(), # main_Utiliser la valeur de retour de la tâche
sub_task_2.s() # main_Utiliser la valeur de retour de la tâche
),
main_completed_task.si(object_id) # s()Pas si()Notez que
).apply_async()