-Traitement asynchrone en combinant django et celery Flux jusqu'à l'enregistrement du résultat de l'utilisation de django-celery-results
--Environnement - python: v.3.7.7 - redis: v.6.0.9
--Modifier settings.py
INSTALLED_APPS = [
...
'django_celery_results', #ajouter à
]
CELERY_BROKER_URL = "redis:// [redis host]:6379" #Spécifiez redis comme courtier
CELERY_RESULT_BACKEND = "django-db" #Le résultat est enregistré dans la base de données spécifiée par django. Cet article suppose MySQL.
CELERY_TASK_TRACK_STARTED = True #Paramètres de confirmation du démarrage de la tâche (décrits plus loin)
├── myapp
│ ├── myapp
│ │ ├── __init__.py
│ │ ├── settings.py
│ │ ├── celery.py
│ │ ├── urls.py
│ │ ├── tasks
│ │ │ ├── __init__.py
│ │ │ └── task.py
│ │ └── wsgi.py
│ └── manage.py
├── docker-compose.yml
└── requirements.txt
--Créer celery.py
--Créez le celery.py
suivant sous le répertoire de l'application (même niveau que urls.py) (presque le même que le document officiel)
# celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault(
'DJANGO_SETTINGS_MODULE',
'myapp.settings'
)
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object(
'django.conf:settings',
namespace='CELERY'
)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
--Créer un fichier de tâche
task.py
comme suit##Appelé(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def hello(proj_id):
time.sleep(10) #10 secondes pour confirmer que le processus se termine de manière asynchrone
message = "hello"
print(message) #Sortie standard
return message #Renvoie le résultat
--Exécuter l'application Django
$python manage.py makemigrations #Créez un fichier de migration (si nécessaire)
$python manage.py migrate #exécuter migrer
$python manage.py rumserver #Exécution de l'application
# redis-server
$celery -A myapp worker --concurrency=1 -l info
[your_task_function] .delay ()
--Dans l'exemple ci-dessous, il attend juste 10 secondes puis renvoie "bonjour".hello.delay ()
, et l'identifiant peut être référencé.from django_app.tasks import hello
from celery.result import AsyncResult
def hoge():
task = hello.delay(project_id) #C'est tout ce dont vous avez besoin pour appeler le traitement asynchrone. Après cela, le processus se déroule de manière asynchrone
print("===============")
print(task.id) #Vous pouvez vérifier l'ID attribué de manière unique.
--Vérifiez l'état de la tâche à l'aide de l'ID de tâche obtenu dans l'exemple ci-dessus.
--Utilisez AsyncResult
pour vérifier l'état et le résultat
status
--PENDING
: En attente d'exécution.
--STARTED
: état de démarrage de l'exécution. (Elle n'est pas sortie par défaut. La méthode de sortie sera décrite plus loin.)
-- SUCCÈS
: Exécution terminée normalement.
--FAILED
: l'exécution s'est terminée anormalement.from django_app.tasks import hello
from celery.result import AsyncResult
def hoge():
task = hello.delay(project_id) #C'est tout ce dont vous avez besoin pour appeler le traitement asynchrone. Après cela, le processus se déroule de manière asynchrone
print("===============")
task_id = task.id
print(task_id) #Vous pouvez vérifier l'ID attribué de manière unique.
task_1 = AsyncResult(task_id)
print(task_1.status) #Vérifiez l'état immédiatement après le début du traitement
time.sleep(1)
print("===============")
task_2 = AsyncResult(task_id)
print(task_2.status) #Vérifiez l'état 1 seconde après le traitement
--Enregistrer le résultat dans DB «Je ne pense pas que vous l'utilisiez beaucoup, mais comme méthode. ..
CELERY_RESULT_BACKEND
dans settings.py, l'état / le résultat du traitement asynchrone est automatiquement enregistré dans la base de données. Dans cet article, nous supposerons MySQL.from django_app.tasks import hello
from django_celery_results.models import TaskResult
def hoge():
task = hello.delay(project_id) #C'est tout ce dont vous avez besoin pour appeler le traitement asynchrone. Après cela, le processus se déroule de manière asynchrone
print("===============")
task_id = task.id #Vous pouvez vérifier l'ID attribué de manière unique.
print(task_id)
task_model_1 = TaskResult.objects.filter(task_id=task_id)
if len(task_model_1) > 0:
print(task_model_1[0].status) #Vérifiez l'état immédiatement après le début du traitement
time.sleep(1)
task_model_2 = TaskResult.objects.filter(task_id=task_id)
if len(task_model_2) > 0:
print(task_model_2[0].status) #Vérifiez l'état 1 seconde après le traitement
#Contenu de MySQL
##Le tableau est le suivant.
mysql> show tables;
+----------------------------------+
| Tables_in_nodeai |
+----------------------------------+
| auth_group |
| auth_group_permissions |
| auth_permission |
| django_admin_log |
| django_celery_results_taskresult | ## <-Cela a été ajouté!
...
## django_celery_results_Le schéma de résultat de la tâche est le suivant.
mysql> desc django_celery_results_taskresult;
+------------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------------+--------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| task_id | varchar(255) | NO | UNI | NULL | |
| status | varchar(50) | NO | MUL | NULL | |
| content_type | varchar(128) | NO | | NULL | |
| content_encoding | varchar(64) | NO | | NULL | |
| result | longtext | YES | | NULL | |
| date_done | datetime(6) | NO | MUL | NULL | |
| traceback | longtext | YES | | NULL | |
| meta | longtext | YES | | NULL | |
| task_args | longtext | YES | | NULL | |
| task_kwargs | longtext | YES | | NULL | |
| task_name | varchar(255) | YES | MUL | NULL | |
| worker | varchar(100) | YES | MUL | NULL | |
| date_created | datetime(6) | NO | MUL | NULL | |
+------------------+--------------+------+-----+---------+----------------+
--Si CELERY_TASK_TRACK_STARTED
n'est pas spécifié dans settings.py, la tâche sera exécutée jusqu'à ce que le résultat soit obtenu.
AsyncResult
est toujours PENDING
--Le contenu de la tâche n'est pas conservé dans le DB. (Enregistré uniquement lorsque le résultat est disponible)settings.py
CELERY_TASK_TRACK_STARTED = True
--Si vous soulevez une erreur avec relance, le résultat de la tâche sera FAILED
task.result
##Appelé(=Task)
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def hello(proj_id):
time.sleep(10) #10 secondes pour confirmer que le processus se termine de manière asynchrone
message = "hello"
raise Exception("my error message") #Notification d'erreur
#Résultats stockés dans MySQL
mysql> select * from django_celery_results_taskresult\G
...
*************************** 31. row ***************************
id: 31
task_id: be294008-d2fc-4760-9055-483efdaa4970
status: FAILURE
content_type: application/json
content_encoding: utf-8
result: {"exc_type": "Exception", "exc_message": ["my error message"], "exc_module": "builtins"}
date_done: 2020-11-10 08:06:32.848782
traceback: Traceback (most recent call last):...
meta: {"children": []}
task_args: (4,)
task_kwargs: {}
task_name: myapp.tasks.task.hello
worker: celery@05ab2e4b5ee1
date_created: 2020-11-10 08:06:22.829301
Recommended Posts