It's really useful to add save () and load () methods to Target in Luigi

That's all for the title!

…… Because it's something, with a few examples.

As anyone who has used it knows, workflow manager Luigi abstracts the data generated by a task and treats it as an inherited class of Target. For the time being, it works if you can implement the ʻexists () method to see if it already exists. For example, if it is a local file, LocalTarget` is used, but if you write it according to the tutorial for the time being, it will be as follows.

from luigi import Task, ExternalTask, LocalTarget
import pandas as pd


class RawFile(ExternalTask):

    def output(self):
        return LocalTarget('path/to/file.csv')


class Aggregation(Task):

    def requires(self):
        yield RawFile()

    def run(self):
        df = pd.read_csv(self.input()[0], sep='\t', parse_dates=[7, 8], encoding='cp932')
        ...

Well ... well, here comes some other task that depends on RawFile. Let's think about that. I'm gradually starting to write this kind of code ...

class Plot(Task):

    def requires(self):
        yield RawFile()

    def run(self):
        df = pd.read_csv(

Wait a minute. Do you remember the right options to pass to the CSV file parser for every project you're involved in? ** I don't remember. ** Would you like to write an option every time? I don't want to write ** absolutely ** many times. Luigi abstracts Target into the path of the original data, but it's definitely dull. That's right, ** you just have to abstract the reading **.

from luigi import Task, ExternalTask, LocalTarget
import pandas as pd


class RawFileTarget(LocalTarget):

    path = 'path/to/file.csv'

    def __init__(self):
        super(RawFileTarget, self).__init__(path)

    def load(self):
        return pd.read_csv(self.fn, sep='\t', parse_dates=[7, 8], encoding='cp932')

class RawFile(ExternalTask):

    def output(self):
        return RawFileTarget()

Let's define Target, which definesload (), and so on.

That way, any dependent task

class Aggregation(Task):

    def requires(self):
        yield RawFile()

    def run(self):
        df = self.input()[0].load()
        ...


class Plot(Task):

    def requires(self):
        yield RawFile()

    def run(self):
        df = self.input()[0].load()
        ...

It's refreshing. Later, the person in charge of data handling was given a CSV file with different specifications from the one with the words "I'm sorry, the one I was given before, I made a mistake ...", but I misunderstood. Even if the passed file is Excel, you can just rewrite RawFileTarget.load (). Happy!

As the title says, I'm happy to write save () so that it corresponds to load (), but I'll omit the code example.

Recommended Posts

It's really useful to add save () and load () methods to Target in Luigi
[TF] How to load / save Model and Parameter in Keras
Add totals to rows and columns in pandas
[TF] How to save and load Tensorflow learning parameters
Foreigners talk: How to name classes and methods in English
Useful tricks related to list and for statements in Python
It's faster to add than to join and extend the list, right?
Throw something to Kinesis with python and make sure it's in