Take the execution log of Celery

Origin

I often use the combination of Django + Redis (sometimes ElastiCashe) + Celery.

I'm doing it like that.

It's easy to write, but if you want to know if it has flowed, you can look at the log file if there is only one Celery worker, but if there are multiple Celery workers, it is difficult to find out where it flowed.

I want to know what to ask and what went wrong

So, I will put out the execution log of request-> success / failure in the model of RDB so that it can be confirmed on the admin site.

Of course, even with just one Celery worker, it's easier than following the log files.

I have already written a lot of Celery Tasks, so I want to make it as easy as possible.

Inheritance source Task

Normally, you start writing with from celery import Task, but let's write a Task for error handling.

The caller uses apply_async (), which is called after delay (), to log the request. The called side logs on_success () when it is normal and on_failure () when it is abnormal.

To use it, change the inheritance source to Task-> BaseHandlingTask like SampleTask.

By the way, if you make delay () asynchronously and change it to run (), nothing will be recorded because it is synchronous.

app/tasks/base/handling.py

import json
import logging
import socket
import sys
import traceback
from celery import Task
from django.db import transaction

from app.models import JobState


class BaseHandlingTask(Task):
    """Base task for error handling"""
    logger = logging.getLogger('prj')

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options):
        """Apply tasks asynchronously by sending a message."""
        async_result = None
        try:
            async_result = super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
        except:     #Redis is not up, etc.
            #If the caller is inside an atomic block, it will be rolled back together, so use the second connection
            with transaction.atomic(using='force'):
                exc_type, exc_value, exc_traceback = sys.exc_info()
                job_state = JobState()  # task_no id
                job_state.task_name = self.name
                job_state.name = self.name.split('.')[-1]
                if args:
                    job_state.args = json.dumps(list(args))
                if kwargs:
                    job_state.kwargs = json.dumps(kwargs)
                job_state.origin = socket.gethostname()
                job_state.exception_class = exc_value.__class__.__name__
                job_state.exception_msg = exc_value.args[0] if exc_value.args else exc_type.__module__ + '.' + exc_type.__name__
                job_state.traceback = traceback.format_exc()
                job_state.save(using='force')
            raise

        #When the startup is successful-Redis is up. Celery is up / not up(is_complete ==Remains False)
        job_state, is_created = JobState.objects.get_or_create(task_id=async_result.id)
        job_state.task_name = async_result.task_name
        job_state.name = async_result.task_name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        job_state.origin = socket.gethostname()
        job_state.save()

        return async_result

    def on_success(self, retval, task_id, args, kwargs):
        """Success handler -Handler at normal time-Called on the Celery worker side"""
        job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
        job_state.task_name = self.name
        job_state.name = self.name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        if self.request.get('origin'):
            job_state.origin = self.request.get('origin')
        job_state.hostname = self.request.get('hostname')
        job_state.is_complete = True
        job_state.save()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Error handler -Handler on error-Called on the Celery worker side
            - run()Even if it is in the atomic block of, the record is not rolled back because it is called separately by the worker.
        """
        job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
        job_state.task_name = self.name
        job_state.name = self.name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        if self.request.get('origin'):
            job_state.origin = self.request.get('origin')
        job_state.hostname = self.request.get('hostname')
        job_state.exception_class = exc.__class__.__name__
        job_state.exception_msg = str(exc)
        job_state.traceback = str(einfo)
        job_state.save()

    def run(self, *args, **kwargs):
        """The body of the task executed by workers."""
        raise NotImplementedError('Tasks must define the run method.')


class SampleTask(BaseHandlingTask):
    """Sample task"""
    logger = logging.getLogger('prj')

    def run(self, is_error=False):
        self.logger.info('SampleTask start...')
        with transaction.atomic():
            if is_error:
                raise ValueError('It is an error')
        self.logger.info('SampleTask end.')

JobState model

This is the model to which the log is output.

You can see.

app/models/job_state.py

from django.db import models


class JobState(models.Model):
    """Job status"""
    task_id = models.CharField('Task ID', max_length=255, blank=True, null=True, db_index=True)   # UUID
    task_name = models.CharField('Task name', max_length=255, blank=True, null=True)     #Example: app.tasks.handling.SampleTask
    name = models.CharField('name of the class', max_length=255, blank=True, null=True)          #Example: SampleTask
    args = models.TextField('args', null=True, blank=True)
    kwargs = models.TextField('kwargs', null=True, blank=True)
    is_complete = models.BooleanField('Done', default=False)
    origin = models.CharField('origin', max_length=255, blank=True, null=True)  # Name of host that sent this task.
    hostname = models.CharField('hostname', max_length=255, blank=True, null=True)  # Node name of the worker instance executing the task.
    exception_class = models.CharField('Exception class', max_length=255, null=True, blank=True, default='')
    exception_msg = models.CharField('Exception message', max_length=255, null=True, blank=True, default='')
    traceback = models.TextField('traceback', null=True, blank=True, default='')

    created_at = models.DateTimeField('Registered Date', auto_now_add=True, blank=True, null=True)
    updated_at = models.DateTimeField('Update date and time', auto_now=True, blank=True, null=True)

    def __str__(self):
        return self.task_id if self.task_id else str(self.id)

admin site registration

Make JobState visible on the admin site. If it is left as it is, it will be longer, so it may be shortened in some places.

app/admin.py

from django.contrib import admin
from app.models import JobState

class JobStateAdmin(admin.ModelAdmin):
    """Job status"""
    list_display = ('id', 'task_id_shorten', 'name', 'x_args', 'x_kwargs', 'is_complete', 'exception_class', 'origin', 'hostname', 'created_at')
    list_display_links = ('id', 'task_id_shorten', 'name')
    list_filter = ('is_complete',)
    search_fields = ['task_id', 'task_name', 'name']

    def task_id_shorten(self, obj):
        return obj.task_id[:8] + '...' if obj.task_id else ''
    task_id_shorten.short_description = 'Task ID'

    def x_args(self, obj):
        return obj.args[:20] + '...' if obj.args and len(obj.args) > 20 else obj.args
    x_args.short_description = 'args'

    def x_kwargs(self, obj):
        return obj.kwargs[:20] + '...' if obj.kwargs and len(obj.kwargs) > 20 else obj.kwargs
    x_kwargs.short_description = 'kwargs'

admin.site.register(JobState, JobStateAdmin)

settings

If the caller is transaction.atomic (), JobState will also be rolled back in the event of an error, so it's a bit brute force, but I'm increasing another Database connection for forced writes.

If you don't have to go that far, you can do it.

Add the following under DATABASE = {}

prj/settings/local.py

#Make a second connection with the same settings so that even if an Exception occurs in the atmic block, it can be written to JobState.
DATABASES.update({'force': DATABASES['default']})

that's all

It's a rough-cut code, but now you can see it at a glance.

If it is too much to include the normal time, comment out the part that is put out in the normal time.

Also, since there are too many cron jobs (Celery beats) that flow in 15-minute units, the BaseHandlingTask is not inherited in the first place, but the plain Task is inherited.

Please adjust with etc.

Recommended Posts

Take the execution log of Celery
Setting to output the log of cron execution
Take the logarithm of the nonzero element of scipy.sparse
Build the execution environment of Jupyter Lab
Visualize the export data of Piyo log
Prepare the execution environment of Python3 with Docker
The inaccuracy of Tensorflow was due to log (0)
From the introduction of pyethapp to the execution of contract
The beginning of cif2cell
I want to grep the execution result of strace
The meaning of self
Take a peek at the processing of LightGBM Tuner
Take a screenshot of the LCD with Python-LEGO Mindstorms
the zen of Python
Take the value of SwitchBot thermo-hygrometer with Raspberry Pi
The story of sys.path.append ()
Log the value of SwitchBot thermo-hygrometer with Raspberry Pi
Preparing the execution environment of PyTorch with Docker November 2019
Reproduce the execution example of Chapter 4 of Hajipata in Python
Measurement of execution time
Reproduce the execution example of Chapter 5 of Hajipata in Python
Revenge of the Types: Revenge of types
Measure the execution result of the program in C ++, Java, Python.
Change the log retention period of CloudWatch Logs in Lambda
[Linux] [kernel module] Specify / limit the execution CPU of kthread
Align the version of chromedriver_binary
Scraping the result of "Schedule-kun"
10. Counting the number of lines
The story of building Zabbix 4.4
Towards the retirement of Python2
[Apache] The story of prefork
Compare the fonts of jupyter-themes
About the ease of Python
Get the number of digits
Explain the code of Tensorflow_in_ROS
Reuse the results of clustering
GoPiGo3 of the old man
Calculate the number of changes
Change the theme of Jupyter
The popularity of programming languages
Change the style of matplotlib
Visualize the orbit of Hayabusa2
About the components of Luigi
Connected components of the graph
Filter the output of tracemalloc
About the features of Python
Simulation of the contents of the wallet
The Power of Pandas: Python
I want to record the execution time and keep a log.
How to change the log level of Azure SDK for Python
Let's take a look at the feature map of YOLO v3
Flow of getting the result of asynchronous processing using Django and Celery
[Python of Hikari-] Chapter 07-02 Exception handling (continuous execution of the program by exception handling)
[Python3] Define a decorator to measure the execution time of a function
How to monitor the execution status of sqlldr with the pv command
About the SystemChannels API to take advantage of Flutter platform-specific features
Periodically log the value of Omron environment sensor with Raspberry Pi