Data pipeline construction with Python and Luigi

Introduction

This article is a Japanese translation of Building Data Pipelines with Python and Luigi. The original article was well done, so I tried to translate it, though I wasn't sure, for my own understanding. If you have any mistakes, please let us know in the comments.

Data pipeline construction with Python and Luigi

For data scientists, their day-to-day work is often more research and development than engineering. Nonetheless, the prototype-to-product process requires a fair amount of re-engineering efforts, with quick and muddy decisions being the next best thing ^ 1. This always delays innovation, and generally speaking, delays the entire project.

This article discusses the experience of building a data pipeline: data extraction, cleaning, merging, preprocessing, and all the common steps needed to prepare data for a data-driven product. Of particular focus is ** data plumbing **, and how a workflow manager like Luigi can be a savior without disturbing you. A smooth transition from prototype to product with minimal effort.

A sample code is available at GitHub Gist.

Prototype so far

In past prototypes, the data pipeline looked roughly like this:

$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py

In the preliminary experimental phase of a data project, the following are quite common: pretreatment is required, which is likely to lead to a quick hack ^ 2, so it is plagued by engineering best practices. And the number of scripts swells and the data pipeline peels.

This approach only has the advantage of being quick and hacky. The downside is that it's boring: you'll want to rerun the pipeline every time, and you'll have to manually call a bunch of scripts one after another. In addition, there is a lot of misunderstanding when sharing this prototype with colleagues (such as "Why doesn't do_stuff_with_data work? "," Did you do clean_some_data first?", Etc.).

The apparently hacky solution seems to push everything into one script. After some quick refactoring, the do_everything.py script would look like this:

if __name__ == '__main__':
    get_some_data()
    clean_some_data()
    join_other_data()
    do_stuff_with_data()

Easy to do:

$ python do_everything.py

(Note: You can put everything together in a bash script that calls a bunch of scripts in sequence, but the drawbacks remain the same)

Code template

When we move on to the product-ready pipeline, we need to think a bit about the aspects of the code that execute all of the examples. In particular, error handling should be considered:

try:
    get_some_data()
except GetSomeDataError as e:
    #Error handling

But when all the tasks are put together, it turns into a try / except Christmas tree:

try:
    get_some_data()
    try:
        clean_some_data()
        try:
            #Do something here...
        except EvenMoreErrors:
            # ...
    except CleanSomeDataError as e:
        #Handle CleanSomeDataError
except GetSomeDataError as e:
    #Handle GetSomeDataError

Another important consideration is how to restore the pipeline. For example, if the first few tasks are completed, but an error occurs along the way, how can you rerun the pipeline without rerunning the first successful step?

#Check if the task is already successful
if not i_got_the_data_already():
    #If not, do it
    try:
        get_some_date()
    except GetSomeDataError as e:
        #Error handling

To Luigi

Luigi is a Python tool for workflow management developed by Spotify to help build complex data pipelines for batch jobs. Luigi installation is:

pip install luigi

The useful features of Luigi are:

--Dependency management --Checkpoint / Recovery from failure --CLI integration / parameterization --Dependency graph visualization

There are two key concepts to understanding how Luigi can be applied to your data pipeline: tasks and targets. A task is a collection of tasks, represented by inheriting the luigi.Task class and overriding some basic methods. The output of the task is the target, which may be the local file system, Amazon S3, or the database.

Dependencies can be defined on inputs and outputs. For example, if task B depends on task A, it means that the output of task A is the input of task B.

Let's take a look at some typical tasks:

# Filename: run_luigi.py
import luigi
 
class PrintNumbers(luigi.Task):
 
    def requires(self):
        return []
 
    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")
 
    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))
 
class SquaredNumbers(luigi.Task):
 
    def requires(self):
        return [PrintNumbers()]
 
    def output(self):
        return luigi.LocalTarget("squares.txt")
 
    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))
                 
if __name__ == '__main__':
    luigi.run()

This code presents two tasks: PrintNumbers, which writes numbers from 1 to 10 line by line to a file called number_up_to_10.txt, and squares, which reads that file and pairs with square numbers line by line. It's a Squared Numbers that writes to a file called .txt.

To perform this task:

$ python run_luigi.py SquaredNumbers --local-scheduler

Luigi considers dependency checking between tasks and finds that there is no SquaredNumbers input, so first run the PrintNumbers task and then run the SquaredNumbers.

The first argument passed to Luigi is the name of the last task in the pipeline you want to perform. The second argument simply tells Luigi to use the local scheduler (more on that later).

You can also use the luigi command:

$ luigi -m run_luigi.py SquaredNumbers --local-scheduler

Task skeleton

To create a Luigi task, simply create a class with luigi.Task as the parent and override some methods. In particular:

--requires () is a list of dependent tasks --ʻOutput () is the target of the task (eg LocalTarget, S3Target, etc.) --run ()` is the execution logic

Is. Luigi checks the return values of requires () and ʻoutput ()` and builds a dependency graph accordingly.

Pass parameters

Hard-coded filenames and settings are generally anti-patterns. Once you understand the structure and dynamics of a task, you should parameterize your settings so that you can dynamically call the same script with different arguments.

That is the luigi.Parameter () class. Each Luigi task can have several parameters. For example, let's say in the previous example that you can change the number. Since we are using integers as parameters for the range () function, we can use luigi.IntParameter instead of the default parameter class. The modified task looks like this:

class PrintNumbers(luigi.Task):
    n = luigi.IntParameter()
 
    def requires(self):
        return []
 
    def output(self):
        return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))
 
    def run(self):
        with self.output().open('w') as f:
            for i in range(1, self.n+1):
                f.write("{}\n".format(i))
 
class SquaredNumbers(luigi.Task):
    n = luigi.IntParameter()
 
    def requires(self):
        return [PrintNumbers(n=self.n)]
 
    def output(self):
        return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))
 
    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

To raise the SquaredNumbers task to 20 and call it:

$ python run_luigi.py SquaredNumbers --local-scheduler --n 20

Parameters can also have default values. For example:

n = luigi.IntParameter(default=10)

In this case, 10 is used unless the --n argument is specified.

Sample to GitHub Gist

Local vs Global Scheduler

Earlier, I used the --local-scheduler option when running Luigi's tasks on the local scheduler. This is useful for development, but for product environments you should use the centralized scheduler (see the documentation at scheduler).

This has several advantages:

--Avoid running two instances of the same task at the same time --Nice web-based visualization

To run the Luigi scheduler daemon in the foreground:

$ luigid

In the background:

$ luigid --background

It uses port 8082 by default, so you can see the visualization by accessing http: // localhost: 8082 / in your browser.

When the global Luigi scheduler is running, it can be rerun without options for the local scheduler:

$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]

The sample code finishes in milliseconds, but if you want to switch to the browser and see the dependency graph when the task is still running, you probably want a large number, say 10,000,000 or more, to the --n option. You should give it.

The screenshot of the dependency graph is: dependency-graph-screenshot.png

Summary

We discussed the definition of a data pipeline using Luigi, a workflow manager written in Python. Luigi provides a nice abstraction of the pipeline with tasks and targets, and also considers dependencies for you.

From a code reuse and mindset perspective of migrating from prototype to product, I have individual [Python packages] for business logic tasks (http://marcobonzanini.com/2015/07/01/how-to-develop) I find it useful to define it as -and-distribute-python-packages /) (that is, with the setup.py file). This way, you can simply declare ʻimport your_package` from your Luigi script and call it from there.

It's possible for a task to produce multiple files as output, but if that happens, you should probably consider whether the task can be divided into smaller units (ie multiple tasks). Are their outputs logically the same? Are there any dependencies? If you can't split the task, I think it's simple and convenient to just make ʻoutput ()a log file that combines the name of the task itself, the timestamp, and so on. The name of the log file will be something likeTaskName_timestamp_param1value_param2value_etc`.

Workflow managers like Luigi handle dependencies, reduce the amount of code templates for parameter and error handling, manage failure recovery, and follow clear patterns when developing data pipelines. Generally speaking, it is useful because it will help you.

It is important to consider the limits as well:

--Lugi is developed for batch jobs, so it's probably useless for near real-time processing -Does not trigger execution. You need to run a data pipeline (eg through a cronjob)


Recommended Posts

Data pipeline construction with Python and Luigi
Happy GUI construction with electron and python
Data analysis with python 2
Data analysis with Python
Sample data created with python
Programming with Python and Tkinter
Encryption and decryption with Python
Python and hardware-Using RS232C with Python-
Get Youtube data with python
Data analysis environment construction with Python (IPython notebook + Pandas)
Python environment construction and TensorFlow
Investigate Java and python data exchange with Apache Arrow
python with pyenv and venv
Works with Python and R
Read json data with python
Get rid of dirty data with Python and regular expressions
Solve the spiral book (algorithm and data structure) with python!
Get additional data to LDAP with python (Writer and Reader)
Communicate with FX-5204PS with Python and PyUSB
Environment construction of python and opencv
Shining life with Python and OpenCV
Get started with Python! ~ ① Environment construction ~
Robot running with Arduino and python
Install Python 2.7.9 and Python 3.4.x with pip.
Neural network with OpenCV 3 and Python 3
AM modulation and demodulation with python
[Python] font family and font with matplotlib
Scraping with Node, Ruby and Python
Scraping with Python, Selenium and Chromedriver
Scraping with Python and Beautiful Soup
[Python] Get economic data with DataReader
JSON encoding and decoding with python
Hadoop introduction and MapReduce with Python
[GUI with Python] PyQt5-Drag and drop-
Python data structures learned with chemoinformatics
Hashing data in R and Python
Reading and writing NetCDF with Python
I played with PyQt5 and Python3
Python3 environment construction with pyenv-virtualenv (CentOS 7.3)
Easy data visualization with Python seaborn.
Reading and writing CSV with Python
Multiple integrals with Python and Sympy
Process Pubmed .xml data with python
pytorch @ python3.8 environment construction with pipenv
Data analysis starting with python (data visualization 1)
Coexistence of Python2 and 3 with CircleCI (1.0)
Easy modeling with Blender and Python
Data science environment construction with Docker
Data analysis starting with python (data visualization 2)
Python application: Data cleansing # 2: Data cleansing with DataFrame
Sugoroku game and addition game with python
FM modulation and demodulation with Python
Environment construction with pyenv and pyenv-virtualenv
Get data from MySQL on a VPS with Python 3 and SQLAlchemy
Move data to LDAP with python Change / Delete (Writer and Reader)
LaTeX and R (a little Python) environment construction with SublimeText3 (Windows)
Communicate between Elixir and Python with gRPC
Get additional data in LDAP with python
[Ubuntu 18.04] Python environment construction with pyenv + pipenv
Calculate and display standard weight with python
Receive textual data from mysql with python