Forcer luigi à effectuer un traitement parallèle dans l'environnement Windows

** Nous modifierons également la bibliothèque standard ** ** Aucune modification n'est requise sauf dans l'environnement Windows ou lorsque le traitement parallèle n'est pas effectué **

introduction

Qu'est-ce que Luigi

Un type de planificateur de travaux. S'il existe des dépendances entre plusieurs travaux, ils les exécuteront dans le bon ordre. De plus, s'il n'y a pas de dépendances entre les travaux, ils seront exécutés en parallèle.

Veuillez consulter la page officielle pour plus de détails. spotify/luigi

Problèmes dans l'environnement Windows

La parallélisation n'est pas possible uniquement dans l'environnement Windows.

La raison en est que luigi utilise pickle pour sérialiser les travaux entre les processus, mais certains objets ne peuvent pas être sérialisés par l'implémentation pickle dans l'environnement Windows. (Peut-être)

Solution

Comment modifier de force (version 2.2.0 ou supérieure)

Réécrivez la bibliothèque. Il existe deux cibles de réécriture: luigi / worker.py et la bibliothèque standard multiprocessing / reduction.py.

Lib/site-packages/luigi/worker.py


#Ajouter une importation
from functools import partial

# TaskProcess.__init__interne
class TaskProcess(multiprocessing.Process):
    ...
    def __init__(self, task, worker_id, result_queue, tracking_url_callback,
                 status_message_callback, use_multiprocessing=False, worker_timeout=0):
        ...
        # self.tracking_url_callback = tracking_url_callback
        self.tracking_url_callback = partial(tracking_url_callback, task)
        # self.status_message_callback = status_message_callback
        self.status_message_callback = partial(status_message_callback, task)
        ...
    ...

class worker(Config):
    ...
    # Worker._create_task_Déplacer les fonctions à l'intérieur du processus
    def _update_tracking_url(self, task, tracking_url):
            self._scheduler.add_task(
                task_id=task.task_id,
                worker=self._id,
                status=RUNNING,
                tracking_url=tracking_url,
            )
        
    # Worker._create_task_Déplacer les fonctions à l'intérieur du processus
    def _update_status_message(self, task, message):
        self._scheduler.set_task_status_message(task.task_id, message)

    def _create_task_process(self, task):
        # def update_tracking_url(tracking_url):
        #     self._scheduler.add_task(
        #         task_id=task.task_id,
        #         worker=self._id,
        #         status=RUNNING,
        #         tracking_url=tracking_url,
        #     )

        # def update_status_message(message):
        #     self._scheduler.set_task_status_message(task.task_id, message)

        return TaskProcess(
            task, self._id, self._task_result_queue, self._update_tracking_url, self._update_status_message,
            use_multiprocessing=bool(self.worker_processes > 1),
            worker_timeout=self._config.timeout
        )
    ...

Lib/multiprocessing/reduction.py


#Importer la pièce au début
# import pickle
import dill as pickle

l'aneth peut être installé avec pip.

Une autre solution

Utilisez l'ancienne version. Dans ce cas, il n'y a pas grand besoin de modification.

Cela ne modifie pas la bibliothèque standard. J'ai eu une erreur lorsque j'ai essayé de traiter luigi en parallèle sur Windows, mais la solution Pickle crashing when trying to pickle "update_tracking_url" in luigi.worker?

finalement

Dans la mesure où je l'utilise réellement, ce n'est pas un problème, Veuillez modifier à vos propres risques.

Recommended Posts

Forcer luigi à effectuer un traitement parallèle dans l'environnement Windows
Comment faire un traitement parallèle multicœur avec python
virtualenvwrapper dans l'environnement Windows
Que faire lorsqu'une erreur SSL se produit avec pip dans l'environnement Windows10, miniconda, VScode
Correction d'un moyen pour l'UEFI de forcer Windows à démarrer
Comment utiliser VS Code dans un environnement Venv avec Windows
Double-cliquez sur ipynb dans l'environnement windows + anaconda pour l'ouvrir dans jupyter-notebook
Le traitement parallèle de Python joblib ne fonctionne pas dans l'environnement uWSGI. Comment traiter en parallèle sur uWSGI?
J'ai eu une erreur lorsque j'ai essayé de traiter luigi en parallèle dans Windows, mais la solution
[Python] Comment faire PCA avec Python
Importer des fichiers de Windows vers WSL
Impossible d'importer les packages installés dans l'environnement virtuel avec Anaconda sous Windows 10
Que faire si vous ne pouvez pas installer avec pip dans l'environnement babun
Méthode pour créer un environnement Python dans Xcode 6
Comment faire R chartr () en Python
Configurer Pipenv sur Pycharm dans un environnement Windows
Pour référencer des variables d'environnement en Python dans Blender
Comment faire des événements envoyés par le serveur dans Django
Introduction à docker Création d'un environnement ubuntu dans ubuntu
Caractères Python déformés dans l'environnement Windows + Git Bash
Ce à quoi j'étais accro lors de la création d'applications Web dans un environnement Windows
Comment prendre plusieurs arguments lors d'un traitement parallèle à l'aide du multitraitement en python
Je souhaite utiliser Python dans l'environnement de pyenv + pipenv sous Windows 10
Comment installer le framework d'apprentissage en profondeur Tensorflow 1.0 dans l'environnement Windows Anaconda
Exécutez PIFuHD dans l'environnement Windows + Anaconda + Git Bash
Je veux faire le test de Dunnett en Python
Que faire si pipreqs aboutit à UnicodeDecodeError
Spécification du navigateur Jupyter Notebook dans un environnement Windows
Implémentation minimale d'Union Find en Python
Installez Python 3.5.1 + numpy + scipy + α dans l'environnement Windows
Une manière intelligente de chronométrer le traitement avec Python
EP 11 Utiliser `zip` pour traiter les itérateurs en parallèle
Traitement d'image avec la configuration de l'environnement Python pour Windows
Je veux faire pyenv + pipenv même sous Windows
Traitement parallèle sans signification profonde en Python
Que faire pour obtenir une feuille de calcul Google en Python
[TF] Comment créer Tensorflow dans un environnement Proxy
Construction d'environnement (Windows 10) pour 100 coups de science des données (traitement de données structurées)
Utilisez os.getenv pour obtenir des variables d'environnement en Python
J'ai essayé de créer un environnement avec WSL + Ubuntu + VS Code dans un environnement Windows
Que faire si vous ne voyez pas IntelliSense de Python dans VS Code sous Windows