Force luigi to do parallel processing in windows environment

** We will also modify the standard library ** ** No modification is required except in windows environment or when parallel processing is not performed **

Introduction

What is luigi

A type of job scheduler. If there are dependencies between multiple jobs, they will run them in the correct order. Also, if there are no dependencies between jobs, they will be executed in parallel.

Please see the official page for details. spotify/luigi

Problems in windows environment

Parallelization is not possible only in the windows environment.

The reason is that luigi serializes jobs between processes using pickle, but some objects cannot be serialized by the pickle implementation in the windows environment. (Perhaps)

solution

How to forcibly modify (version 2.2.0 or higher)

Rewrite the library. There are two rewrite targets: luigi / worker.py and the standard library multiprocessing / reduction.py.

Lib/site-packages/luigi/worker.py


#Add import
from functools import partial

# TaskProcess.__init__internal
class TaskProcess(multiprocessing.Process):
    ...
    def __init__(self, task, worker_id, result_queue, tracking_url_callback,
                 status_message_callback, use_multiprocessing=False, worker_timeout=0):
        ...
        # self.tracking_url_callback = tracking_url_callback
        self.tracking_url_callback = partial(tracking_url_callback, task)
        # self.status_message_callback = status_message_callback
        self.status_message_callback = partial(status_message_callback, task)
        ...
    ...

class worker(Config):
    ...
    # Worker._create_task_Move functions inside process
    def _update_tracking_url(self, task, tracking_url):
            self._scheduler.add_task(
                task_id=task.task_id,
                worker=self._id,
                status=RUNNING,
                tracking_url=tracking_url,
            )
        
    # Worker._create_task_Move functions inside process
    def _update_status_message(self, task, message):
        self._scheduler.set_task_status_message(task.task_id, message)

    def _create_task_process(self, task):
        # def update_tracking_url(tracking_url):
        #     self._scheduler.add_task(
        #         task_id=task.task_id,
        #         worker=self._id,
        #         status=RUNNING,
        #         tracking_url=tracking_url,
        #     )

        # def update_status_message(message):
        #     self._scheduler.set_task_status_message(task.task_id, message)

        return TaskProcess(
            task, self._id, self._task_result_queue, self._update_tracking_url, self._update_status_message,
            use_multiprocessing=bool(self.worker_processes > 1),
            worker_timeout=self._config.timeout
        )
    ...

Lib/multiprocessing/reduction.py


#Import part at the beginning
# import pickle
import dill as pickle

dill can be installed with pip.

Another solution

Use the old version. In this case, there is not much need for modification.

This does not modify the standard library. I got an error when I parallelized luigi on windows, but the solution Pickle crashing when trying to pickle "update_tracking_url" in luigi.worker?

Finally

As far as I'm actually using it, it's not a problem, Please modify at your own risk.

Recommended Posts

Force luigi to do parallel processing in windows environment
How to do multi-core parallel processing with python
virtualenvwrapper in windows environment
What to do when SSL error occurs in pip in Windows10, miniconda, VScode environment
How to use jupyter lab in Windows 10 local environment
Fixed a way to force Windows to boot in UEFI
How to use VS Code in venv environment on windows
Double-click ipynb in windows + anaconda environment to open with jupyter-notebook
Parallel processing of Python joblib does not work in uWSGI environment. How to process in parallel on uWSGI?
I got an error when I tried to process luigi in parallel on windows, but the solution
[Python] How to do PCA in Python
Bring files in Windows to WSL
Unable to import packages installed in virtual environment with Anaconda on Windows 10
What to do if you can't install with pip in babun environment
How to access environment variables in Python
Method to build Python environment in Xcode 6
How to do R chartr () in Python
Set up Pipenv in Pycharm in Windows environment
To reference environment variables in Python in Blender
How to do Server-Sent Events in Django
Introduction to docker Create ubuntu environment in ubuntu
Python garbled in Windows + Git Bash environment
What I was addicted to when creating a web application in a windows environment
How to take multiple arguments when doing parallel processing using multiprocessing in python
I want to use Python in the environment of pyenv + pipenv on Windows 10
How to install the deep learning framework Tensorflow 1.0 in the Anaconda environment of Windows
Run PIFuHD in Windows + Anaconda + Git Bash environment
I want to do Dunnett's test in Python
What to do if pipreqs results in UnicodeDecodeError
Type Python scripts to run in QGIS Processing
Browser specification of Jupyter Notebook in Windows environment
Minimal implementation to do Union Find in Python
Install Python 3.5.1 + numpy + scipy + α in Windows environment
A clever way to time processing in Python
EP 11 Use `zip` to Process Iterators in Parallel
Image Processing with Python Environment Setup for Windows
I want to do pyenv + pipenv on Windows
Parallel processing with no deep meaning in Python
What to do to get google spreadsheet in python
[TF] How to build Tensorflow in Proxy environment
Data science 100 knock (structured data processing) environment construction (Windows10)
Use os.getenv to get environment variables in Python
I tried to build an environment with WSL + Ubuntu + VS Code in a Windows environment
What to do if Python IntelliSense is not displayed in VS Code on Windows