J'utilise souvent la combinaison de Django + Redis (parfois ElastiCashe) + Celery.
Je fais ça comme ça.
C'est facile à écrire, mais si vous voulez savoir si cela a coulé, vous pouvez consulter le fichier journal s'il n'y a qu'un seul ouvrier Celery, mais s'il y a plusieurs ouvriers Celery, il est difficile de savoir où il s'est écoulé.
Alors, sortons le journal d'exécution de request-> success / échec dans le modèle de RDB afin qu'il puisse être confirmé sur le site d'administration.
Bien sûr, même avec un seul ouvrier Celery, c'est plus facile que de suivre un fichier journal.
J'ai déjà écrit beaucoup de tâches de céleri, donc je veux le rendre aussi simple que possible.
Normalement, vous commencez à écrire avec from celery import Task
, mais écrivons une tâche qui prend en charge la gestion des erreurs.
L'appelant utilise apply_async (), qui est appelée après delay (), pour enregistrer la requête. Le côté appelé enregistre on_success () quand il est normal et on_failure () quand il est anormal.
Pour l'utiliser, changez la source d'héritage en Task-> BaseHandlingTask comme SampleTask.
Au fait, si vous créez delay () de manière asynchrone et que vous le changez en run (), rien ne sera enregistré car il est synchrone.
app/tasks/base/handling.py
import json
import logging
import socket
import sys
import traceback
from celery import Task
from django.db import transaction
from app.models import JobState
class BaseHandlingTask(Task):
"""Tâche de base pour la gestion des erreurs"""
logger = logging.getLogger('prj')
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options):
"""Apply tasks asynchronously by sending a message."""
async_result = None
try:
async_result = super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
except: #Redis n'est pas en place, etc.
#Si l'appelant est à l'intérieur du bloc atomique, il sera restauré ensemble, utilisez donc la deuxième connexion
with transaction.atomic(using='force'):
exc_type, exc_value, exc_traceback = sys.exc_info()
job_state = JobState() # task_pas d'identifiant
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
job_state.origin = socket.gethostname()
job_state.exception_class = exc_value.__class__.__name__
job_state.exception_msg = exc_value.args[0] if exc_value.args else exc_type.__module__ + '.' + exc_type.__name__
job_state.traceback = traceback.format_exc()
job_state.save(using='force')
raise
#Lorsque le démarrage est réussi-Redis est en place. Le céleri est levé / pas levé(is_complete ==Reste faux)
job_state, is_created = JobState.objects.get_or_create(task_id=async_result.id)
job_state.task_name = async_result.task_name
job_state.name = async_result.task_name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
job_state.origin = socket.gethostname()
job_state.save()
return async_result
def on_success(self, retval, task_id, args, kwargs):
"""Success handler -Manutentionnaire à l'heure normale-Appelé du côté ouvrier du céleri"""
job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
if self.request.get('origin'):
job_state.origin = self.request.get('origin')
job_state.hostname = self.request.get('hostname')
job_state.is_complete = True
job_state.save()
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Error handler -Gestionnaire en cas d'erreur-Appelé du côté ouvrier du céleri
- run()Même à l'intérieur du bloc atomique de, l'enregistrement n'est pas annulé car il est appelé séparément par le worker.
"""
job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
if self.request.get('origin'):
job_state.origin = self.request.get('origin')
job_state.hostname = self.request.get('hostname')
job_state.exception_class = exc.__class__.__name__
job_state.exception_msg = str(exc)
job_state.traceback = str(einfo)
job_state.save()
def run(self, *args, **kwargs):
"""The body of the task executed by workers."""
raise NotImplementedError('Tasks must define the run method.')
class SampleTask(BaseHandlingTask):
"""Exemple de tâche"""
logger = logging.getLogger('prj')
def run(self, is_error=False):
self.logger.info('SampleTask start...')
with transaction.atomic():
if is_error:
raise ValueError('C'est une erreur')
self.logger.info('SampleTask end.')
Il s'agit du modèle vers lequel le journal est généré.
Tu peux voir.
app/models/job_state.py
from django.db import models
class JobState(models.Model):
"""Statut de l'emploi"""
task_id = models.CharField('ID de tâche', max_length=255, blank=True, null=True, db_index=True) # UUID
task_name = models.CharField('Nom de la tâche', max_length=255, blank=True, null=True) #Exemple: app.tasks.handling.SampleTask
name = models.CharField('nom de la classe', max_length=255, blank=True, null=True) #Exemple: SampleTask
args = models.TextField('args', null=True, blank=True)
kwargs = models.TextField('kwargs', null=True, blank=True)
is_complete = models.BooleanField('Terminé', default=False)
origin = models.CharField('origin', max_length=255, blank=True, null=True) # Name of host that sent this task.
hostname = models.CharField('hostname', max_length=255, blank=True, null=True) # Node name of the worker instance executing the task.
exception_class = models.CharField('Classe d'exception', max_length=255, null=True, blank=True, default='')
exception_msg = models.CharField('Message d'exception', max_length=255, null=True, blank=True, default='')
traceback = models.TextField('traceback', null=True, blank=True, default='')
created_at = models.DateTimeField('Date d'enregistrement', auto_now_add=True, blank=True, null=True)
updated_at = models.DateTimeField('Mettre à jour la date et l'heure', auto_now=True, blank=True, null=True)
def __str__(self):
return self.task_id if self.task_id else str(self.id)
Rendez JobState visible sur le site d'administration. S'il est laissé tel quel, il sera plus long, il peut donc être raccourci à certains endroits.
app/admin.py
from django.contrib import admin
from app.models import JobState
class JobStateAdmin(admin.ModelAdmin):
"""Statut de l'emploi"""
list_display = ('id', 'task_id_shorten', 'name', 'x_args', 'x_kwargs', 'is_complete', 'exception_class', 'origin', 'hostname', 'created_at')
list_display_links = ('id', 'task_id_shorten', 'name')
list_filter = ('is_complete',)
search_fields = ['task_id', 'task_name', 'name']
def task_id_shorten(self, obj):
return obj.task_id[:8] + '...' if obj.task_id else ''
task_id_shorten.short_description = 'ID de tâche'
def x_args(self, obj):
return obj.args[:20] + '...' if obj.args and len(obj.args) > 20 else obj.args
x_args.short_description = 'args'
def x_kwargs(self, obj):
return obj.kwargs[:20] + '...' if obj.kwargs and len(obj.kwargs) > 20 else obj.kwargs
x_kwargs.short_description = 'kwargs'
admin.site.register(JobState, JobStateAdmin)
settings
Si l'appelant est transaction.atomic (), JobState sera également annulé en cas d'erreur, donc c'est un peu la force brute, mais nous augmentons une autre connexion à la base de données pour les écritures forcées.
Si vous n'êtes pas obligé d'aller aussi loin, vous pouvez le faire.
Ajoutez ce qui suit sous DATABASE = {}
prj/settings/local.py
#Établissez une deuxième connexion avec les mêmes paramètres afin de pouvoir écrire dans JobState même si vous faites une exception dans le bloc atmic
DATABASES.update({'force': DATABASES['default']})
C'est un code approximatif, mais maintenant vous pouvez le voir en un coup d'œil.
Si c'est trop pour inclure le temps normal, commentez l'endroit qui est mis dehors dans le temps normal.
De plus, comme il y a trop de tâches cron (battements de céleri) qui s'écoulent toutes les 15 minutes, n'héritez pas de BaseHandlingTask en premier lieu, mais héritez de la tâche simple.
Veuillez ajuster avec etc.
Recommended Posts