** We will also modify the standard library ** ** No modification is required except in windows environment or when parallel processing is not performed **
A type of job scheduler. If there are dependencies between multiple jobs, they will run them in the correct order. Also, if there are no dependencies between jobs, they will be executed in parallel.
Please see the official page for details. spotify/luigi
Parallelization is not possible only in the windows environment.
The reason is that luigi serializes jobs between processes using pickle, but some objects cannot be serialized by the pickle implementation in the windows environment. (Perhaps)
Rewrite the library. There are two rewrite targets: luigi / worker.py and the standard library multiprocessing / reduction.py.
Lib/site-packages/luigi/worker.py
#Add import
from functools import partial
# TaskProcess.__init__internal
class TaskProcess(multiprocessing.Process):
...
def __init__(self, task, worker_id, result_queue, tracking_url_callback,
status_message_callback, use_multiprocessing=False, worker_timeout=0):
...
# self.tracking_url_callback = tracking_url_callback
self.tracking_url_callback = partial(tracking_url_callback, task)
# self.status_message_callback = status_message_callback
self.status_message_callback = partial(status_message_callback, task)
...
...
class worker(Config):
...
# Worker._create_task_Move functions inside process
def _update_tracking_url(self, task, tracking_url):
self._scheduler.add_task(
task_id=task.task_id,
worker=self._id,
status=RUNNING,
tracking_url=tracking_url,
)
# Worker._create_task_Move functions inside process
def _update_status_message(self, task, message):
self._scheduler.set_task_status_message(task.task_id, message)
def _create_task_process(self, task):
# def update_tracking_url(tracking_url):
# self._scheduler.add_task(
# task_id=task.task_id,
# worker=self._id,
# status=RUNNING,
# tracking_url=tracking_url,
# )
# def update_status_message(message):
# self._scheduler.set_task_status_message(task.task_id, message)
return TaskProcess(
task, self._id, self._task_result_queue, self._update_tracking_url, self._update_status_message,
use_multiprocessing=bool(self.worker_processes > 1),
worker_timeout=self._config.timeout
)
...
Lib/multiprocessing/reduction.py
#Import part at the beginning
# import pickle
import dill as pickle
dill can be installed with pip.
Use the old version. In this case, there is not much need for modification.
This does not modify the standard library. I got an error when I parallelized luigi on windows, but the solution Pickle crashing when trying to pickle "update_tracking_url" in luigi.worker?
As far as I'm actually using it, it's not a problem, Please modify at your own risk.
Recommended Posts