I often use the combination of Django + Redis (sometimes ElastiCashe) + Celery.
I'm doing it like that.
It's easy to write, but if you want to know if it has flowed, you can look at the log file if there is only one Celery worker, but if there are multiple Celery workers, it is difficult to find out where it flowed.
So, I will put out the execution log of request-> success / failure in the model of RDB so that it can be confirmed on the admin site.
Of course, even with just one Celery worker, it's easier than following the log files.
I have already written a lot of Celery Tasks, so I want to make it as easy as possible.
Normally, you start writing with from celery import Task
, but let's write a Task for error handling.
The caller uses apply_async (), which is called after delay (), to log the request. The called side logs on_success () when it is normal and on_failure () when it is abnormal.
To use it, change the inheritance source to Task-> BaseHandlingTask like SampleTask.
By the way, if you make delay () asynchronously and change it to run (), nothing will be recorded because it is synchronous.
app/tasks/base/handling.py
import json
import logging
import socket
import sys
import traceback
from celery import Task
from django.db import transaction
from app.models import JobState
class BaseHandlingTask(Task):
"""Base task for error handling"""
logger = logging.getLogger('prj')
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options):
"""Apply tasks asynchronously by sending a message."""
async_result = None
try:
async_result = super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
except: #Redis is not up, etc.
#If the caller is inside an atomic block, it will be rolled back together, so use the second connection
with transaction.atomic(using='force'):
exc_type, exc_value, exc_traceback = sys.exc_info()
job_state = JobState() # task_no id
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
job_state.origin = socket.gethostname()
job_state.exception_class = exc_value.__class__.__name__
job_state.exception_msg = exc_value.args[0] if exc_value.args else exc_type.__module__ + '.' + exc_type.__name__
job_state.traceback = traceback.format_exc()
job_state.save(using='force')
raise
#When the startup is successful-Redis is up. Celery is up / not up(is_complete ==Remains False)
job_state, is_created = JobState.objects.get_or_create(task_id=async_result.id)
job_state.task_name = async_result.task_name
job_state.name = async_result.task_name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
job_state.origin = socket.gethostname()
job_state.save()
return async_result
def on_success(self, retval, task_id, args, kwargs):
"""Success handler -Handler at normal time-Called on the Celery worker side"""
job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
if self.request.get('origin'):
job_state.origin = self.request.get('origin')
job_state.hostname = self.request.get('hostname')
job_state.is_complete = True
job_state.save()
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Error handler -Handler on error-Called on the Celery worker side
- run()Even if it is in the atomic block of, the record is not rolled back because it is called separately by the worker.
"""
job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
if self.request.get('origin'):
job_state.origin = self.request.get('origin')
job_state.hostname = self.request.get('hostname')
job_state.exception_class = exc.__class__.__name__
job_state.exception_msg = str(exc)
job_state.traceback = str(einfo)
job_state.save()
def run(self, *args, **kwargs):
"""The body of the task executed by workers."""
raise NotImplementedError('Tasks must define the run method.')
class SampleTask(BaseHandlingTask):
"""Sample task"""
logger = logging.getLogger('prj')
def run(self, is_error=False):
self.logger.info('SampleTask start...')
with transaction.atomic():
if is_error:
raise ValueError('It is an error')
self.logger.info('SampleTask end.')
This is the model to which the log is output.
You can see.
app/models/job_state.py
from django.db import models
class JobState(models.Model):
"""Job status"""
task_id = models.CharField('Task ID', max_length=255, blank=True, null=True, db_index=True) # UUID
task_name = models.CharField('Task name', max_length=255, blank=True, null=True) #Example: app.tasks.handling.SampleTask
name = models.CharField('name of the class', max_length=255, blank=True, null=True) #Example: SampleTask
args = models.TextField('args', null=True, blank=True)
kwargs = models.TextField('kwargs', null=True, blank=True)
is_complete = models.BooleanField('Done', default=False)
origin = models.CharField('origin', max_length=255, blank=True, null=True) # Name of host that sent this task.
hostname = models.CharField('hostname', max_length=255, blank=True, null=True) # Node name of the worker instance executing the task.
exception_class = models.CharField('Exception class', max_length=255, null=True, blank=True, default='')
exception_msg = models.CharField('Exception message', max_length=255, null=True, blank=True, default='')
traceback = models.TextField('traceback', null=True, blank=True, default='')
created_at = models.DateTimeField('Registered Date', auto_now_add=True, blank=True, null=True)
updated_at = models.DateTimeField('Update date and time', auto_now=True, blank=True, null=True)
def __str__(self):
return self.task_id if self.task_id else str(self.id)
Make JobState visible on the admin site. If it is left as it is, it will be longer, so it may be shortened in some places.
app/admin.py
from django.contrib import admin
from app.models import JobState
class JobStateAdmin(admin.ModelAdmin):
"""Job status"""
list_display = ('id', 'task_id_shorten', 'name', 'x_args', 'x_kwargs', 'is_complete', 'exception_class', 'origin', 'hostname', 'created_at')
list_display_links = ('id', 'task_id_shorten', 'name')
list_filter = ('is_complete',)
search_fields = ['task_id', 'task_name', 'name']
def task_id_shorten(self, obj):
return obj.task_id[:8] + '...' if obj.task_id else ''
task_id_shorten.short_description = 'Task ID'
def x_args(self, obj):
return obj.args[:20] + '...' if obj.args and len(obj.args) > 20 else obj.args
x_args.short_description = 'args'
def x_kwargs(self, obj):
return obj.kwargs[:20] + '...' if obj.kwargs and len(obj.kwargs) > 20 else obj.kwargs
x_kwargs.short_description = 'kwargs'
admin.site.register(JobState, JobStateAdmin)
settings
If the caller is transaction.atomic (), JobState will also be rolled back in the event of an error, so it's a bit brute force, but I'm increasing another Database connection for forced writes.
If you don't have to go that far, you can do it.
Add the following under DATABASE = {}
prj/settings/local.py
#Make a second connection with the same settings so that even if an Exception occurs in the atmic block, it can be written to JobState.
DATABASES.update({'force': DATABASES['default']})
It's a rough-cut code, but now you can see it at a glance.
If it is too much to include the normal time, comment out the part that is put out in the normal time.
Also, since there are too many cron jobs (Celery beats) that flow in 15-minute units, the BaseHandlingTask is not inherited in the first place, but the plain Task is inherited.
Please adjust with etc.
Recommended Posts