Flow of getting the result of asynchronous processing using Django and Celery


-Asynchronous processing by combining django and celery How to save the result of doing this using django-celery-results --A memorandum of the stumbling block that the status remains PENDING or the result is not written to the DB until the asynchronous process is completed.


--Environment - python: v.3.7.7 - redis: v.6.0.9


--Edit settings.py

    'django_celery_results', #add to

CELERY_BROKER_URL = "redis:// [redis host]:6379" #Specify redis as a broker
CELERY_RESULT_BACKEND = "django-db" #The result is saved in the DB specified by django. This article assumes MySQL.
CELERY_TASK_TRACK_STARTED = True #Settings for confirming that the task has started (described later)


--Executable file structure --The structure of the files created below is as follows

├── myapp
│   ├── myapp
│   │   ├── __init__.py
│   │   ├── settings.py
│   │   ├── celery.py
│   │   ├── urls.py
│   │   ├── tasks
│   │   │   ├── __init__.py
│   │   │   └── task.py
│   │   └── wsgi.py
│   └── manage.py
├── docker-compose.yml
└── requirements.txt

--Create celery.py --Create the following celery.py under the application directory (same level as urls.py) (almost the same as the official document)

# celery.py
from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.

app = Celery('myapp') 

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.

# Load task modules from all registered Django app configs.

def debug_task(self):
    print('Request: {0!r}'.format(self.request))

--Create task file --In order to manage multiple asynchronous scripts, the tasks directory is cut and the scripts for tasks are placed under it. --Prepare task.py as follows

from __future__ import absolute_import, unicode_literals
from celery import shared_task

def hello(proj_id):
    time.sleep(10) #10 seconds to confirm that the process ends asynchronously
    message = "hello"
    print(message) #Standard output
    return message #Returns the result


--Run django application

$python manage.py makemigrations #Create a migration file (if needed)
$python manage.py migrate #execute migrate
$python manage.py rumserver #Application execution

--Redis launch

# redis-server

--Celery launch --cd to the directory where manage.py is located and execute the following command

$celery -A myapp worker --concurrency=1 -l info

Execute Task to get ID

--Tasks can be executed with [your_task_function] .delay () --In the example below, it just waits 10 seconds and then returns "hello". --In order to check the contents of the task, it is necessary to get the ID assigned to the task. --The task object is returned as the return value of hello.delay (), and the id can be referenced. --By holding the task ID in a DB etc., you can check the task status at any time inside the application or from the logic of another process.

from django_app.tasks import hello
from celery.result import AsyncResult

def hoge():
    task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
    print(task.id) #You can check the uniquely assigned ID.

Check the status of Task

--Check the task status using the task ID obtained in the above example. --Use AsyncResult to check the status and result --You can get the task object by passing the task ID. --You can get the status of the task with the status method --PENDING: Waiting for execution. --STARTED: Execution start status. (It is not output by default. The output method will be described later.) --SUCCESS: Execution completed normally. --FAILED: Execution terminated abnormally.

from django_app.tasks import hello
from celery.result import AsyncResult

def hoge():
    task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
    task_id = task.id
    print(task_id) #You can check the uniquely assigned ID.
    task_1 = AsyncResult(task_id)
    print(task_1.status) #Check the status immediately after the start of processing
    task_2 = AsyncResult(task_id)
    print(task_2.status) #Check the status 1 second after processing

--Save the result in DB ――I don't think you use it much, but as a method. .. --By specifying CELERY_RESULT_BACKEND in settings.py, the status / result of asynchronous processing is automatically saved in the DB. In this article, we will assume MySQL. --You can search task information from django by specifying the task ID.

from django_app.tasks import hello
from django_celery_results.models import TaskResult

def hoge():
    task = hello.delay(project_id) #This is all you need to call asynchronous processing. After that, the process flows asynchronously
    task_id = task.id #You can check the uniquely assigned ID.
    task_model_1 = TaskResult.objects.filter(task_id=task_id)
    if len(task_model_1) > 0:
        print(task_model_1[0].status) #Check the status immediately after the start of processing
    task_model_2 = TaskResult.objects.filter(task_id=task_id)
    if len(task_model_2) > 0:
        print(task_model_2[0].status) #Check the status 1 second after processing
#Contents of MySQL

##The table is as follows.
mysql> show tables;
| Tables_in_nodeai                 |
| auth_group                       |
| auth_group_permissions           |
| auth_permission                  |
| django_admin_log                 |
| django_celery_results_taskresult |  ## <-This has been added!

## django_celery_results_The schema of task result is as follows.
mysql> desc django_celery_results_taskresult;
| Field            | Type         | Null | Key | Default | Extra          |
| id               | int(11)      | NO   | PRI | NULL    | auto_increment |
| task_id          | varchar(255) | NO   | UNI | NULL    |                |
| status           | varchar(50)  | NO   | MUL | NULL    |                |
| content_type     | varchar(128) | NO   |     | NULL    |                |
| content_encoding | varchar(64)  | NO   |     | NULL    |                |
| result           | longtext     | YES  |     | NULL    |                |
| date_done        | datetime(6)  | NO   | MUL | NULL    |                |
| traceback        | longtext     | YES  |     | NULL    |                |
| meta             | longtext     | YES  |     | NULL    |                |
| task_args        | longtext     | YES  |     | NULL    |                |
| task_kwargs      | longtext     | YES  |     | NULL    |                |
| task_name        | varchar(255) | YES  | MUL | NULL    |                |
| worker           | varchar(100) | YES  | MUL | NULL    |                |
| date_created     | datetime(6)  | NO   | MUL | NULL    |                |

Prevent the Task state from staying PENDING

--If CELERY_TASK_TRACK_STARTED is not specified in settings.py, the task will be executed until the result is obtained. --The status obtained using AsyncResult is still PENDING --The contents of the task are not held in the DB. (Saved only after the result is available) --In order to solve it, the following settings are put in settings.py


Try to fail Task

--If you raise an error with raise, the result of the task will be FAILED --The error message resulting from raising is stored in task.result

from __future__ import absolute_import, unicode_literals
from celery import shared_task

def hello(proj_id):
    time.sleep(10) #10 seconds to confirm that the process ends asynchronously
    message = "hello"
    raise Exception("my error message") #Error notification
#Results stored in MySQL
mysql> select * from django_celery_results_taskresult\G
*************************** 31. row ***************************
              id: 31
         task_id: be294008-d2fc-4760-9055-483efdaa4970
          status: FAILURE
    content_type: application/json
content_encoding: utf-8
          result: {"exc_type": "Exception", "exc_message": ["my error message"], "exc_module": "builtins"}
       date_done: 2020-11-10 08:06:32.848782
       traceback: Traceback (most recent call last):...
            meta: {"children": []}
       task_args: (4,)
     task_kwargs: {}
       task_name: myapp.tasks.task.hello
          worker: celery@05ab2e4b5ee1
    date_created: 2020-11-10 08:06:22.829301

