** Nous modifierons également la bibliothèque standard ** ** Aucune modification n'est requise sauf dans l'environnement Windows ou lorsque le traitement parallèle n'est pas effectué **
Un type de planificateur de travaux. S'il existe des dépendances entre plusieurs travaux, ils les exécuteront dans le bon ordre. De plus, s'il n'y a pas de dépendances entre les travaux, ils seront exécutés en parallèle.
Veuillez consulter la page officielle pour plus de détails. spotify/luigi
La parallélisation n'est pas possible uniquement dans l'environnement Windows.
La raison en est que luigi utilise pickle pour sérialiser les travaux entre les processus, mais certains objets ne peuvent pas être sérialisés par l'implémentation pickle dans l'environnement Windows. (Peut-être)
Réécrivez la bibliothèque. Il existe deux cibles de réécriture: luigi / worker.py et la bibliothèque standard multiprocessing / reduction.py.
Lib/site-packages/luigi/worker.py
#Ajouter une importation
from functools import partial
# TaskProcess.__init__interne
class TaskProcess(multiprocessing.Process):
...
def __init__(self, task, worker_id, result_queue, tracking_url_callback,
status_message_callback, use_multiprocessing=False, worker_timeout=0):
...
# self.tracking_url_callback = tracking_url_callback
self.tracking_url_callback = partial(tracking_url_callback, task)
# self.status_message_callback = status_message_callback
self.status_message_callback = partial(status_message_callback, task)
...
...
class worker(Config):
...
# Worker._create_task_Déplacer les fonctions à l'intérieur du processus
def _update_tracking_url(self, task, tracking_url):
self._scheduler.add_task(
task_id=task.task_id,
worker=self._id,
status=RUNNING,
tracking_url=tracking_url,
)
# Worker._create_task_Déplacer les fonctions à l'intérieur du processus
def _update_status_message(self, task, message):
self._scheduler.set_task_status_message(task.task_id, message)
def _create_task_process(self, task):
# def update_tracking_url(tracking_url):
# self._scheduler.add_task(
# task_id=task.task_id,
# worker=self._id,
# status=RUNNING,
# tracking_url=tracking_url,
# )
# def update_status_message(message):
# self._scheduler.set_task_status_message(task.task_id, message)
return TaskProcess(
task, self._id, self._task_result_queue, self._update_tracking_url, self._update_status_message,
use_multiprocessing=bool(self.worker_processes > 1),
worker_timeout=self._config.timeout
)
...
Lib/multiprocessing/reduction.py
#Importer la pièce au début
# import pickle
import dill as pickle
l'aneth peut être installé avec pip.
Utilisez l'ancienne version. Dans ce cas, il n'y a pas grand besoin de modification.
Cela ne modifie pas la bibliothèque standard. J'ai eu une erreur lorsque j'ai essayé de traiter luigi en parallèle sur Windows, mais la solution Pickle crashing when trying to pickle "update_tracking_url" in luigi.worker?
Dans la mesure où je l'utilise réellement, ce n'est pas un problème, Veuillez modifier à vos propres risques.