This article is a Japanese translation of Building Data Pipelines with Python and Luigi. The original article was well done, so I tried to translate it, though I wasn't sure, for my own understanding. If you have any mistakes, please let us know in the comments.
For data scientists, their day-to-day work is often more research and development than engineering. Nonetheless, the prototype-to-product process requires a fair amount of re-engineering efforts, with quick and muddy decisions being the next best thing ^ 1. This always delays innovation, and generally speaking, delays the entire project.
This article discusses the experience of building a data pipeline: data extraction, cleaning, merging, preprocessing, and all the common steps needed to prepare data for a data-driven product. Of particular focus is ** data plumbing **, and how a workflow manager like Luigi can be a savior without disturbing you. A smooth transition from prototype to product with minimal effort.
A sample code is available at GitHub Gist.
In past prototypes, the data pipeline looked roughly like this:
$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py
In the preliminary experimental phase of a data project, the following are quite common: pretreatment is required, which is likely to lead to a quick hack ^ 2, so it is plagued by engineering best practices. And the number of scripts swells and the data pipeline peels.
This approach only has the advantage of being quick and hacky. The downside is that it's boring: you'll want to rerun the pipeline every time, and you'll have to manually call a bunch of scripts one after another. In addition, there is a lot of misunderstanding when sharing this prototype with colleagues (such as "Why doesn't do_stuff_with_data
work? "," Did you do clean_some_data
first?", Etc.).
The apparently hacky solution seems to push everything into one script. After some quick refactoring, the do_everything.py
script would look like this:
if __name__ == '__main__':
get_some_data()
clean_some_data()
join_other_data()
do_stuff_with_data()
Easy to do:
$ python do_everything.py
(Note: You can put everything together in a bash script that calls a bunch of scripts in sequence, but the drawbacks remain the same)
When we move on to the product-ready pipeline, we need to think a bit about the aspects of the code that execute all of the examples. In particular, error handling should be considered:
try:
get_some_data()
except GetSomeDataError as e:
#Error handling
But when all the tasks are put together, it turns into a try / except Christmas tree:
try:
get_some_data()
try:
clean_some_data()
try:
#Do something here...
except EvenMoreErrors:
# ...
except CleanSomeDataError as e:
#Handle CleanSomeDataError
except GetSomeDataError as e:
#Handle GetSomeDataError
Another important consideration is how to restore the pipeline. For example, if the first few tasks are completed, but an error occurs along the way, how can you rerun the pipeline without rerunning the first successful step?
#Check if the task is already successful
if not i_got_the_data_already():
#If not, do it
try:
get_some_date()
except GetSomeDataError as e:
#Error handling
Luigi is a Python tool for workflow management developed by Spotify to help build complex data pipelines for batch jobs. Luigi installation is:
pip install luigi
The useful features of Luigi are:
--Dependency management --Checkpoint / Recovery from failure --CLI integration / parameterization --Dependency graph visualization
There are two key concepts to understanding how Luigi can be applied to your data pipeline: tasks and targets. A task is a collection of tasks, represented by inheriting the luigi.Task
class and overriding some basic methods. The output of the task is the target, which may be the local file system, Amazon S3, or the database.
Dependencies can be defined on inputs and outputs. For example, if task B depends on task A, it means that the output of task A is the input of task B.
Let's take a look at some typical tasks:
# Filename: run_luigi.py
import luigi
class PrintNumbers(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")
def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("{}\n".format(i))
class SquaredNumbers(luigi.Task):
def requires(self):
return [PrintNumbers()]
def output(self):
return luigi.LocalTarget("squares.txt")
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
if __name__ == '__main__':
luigi.run()
This code presents two tasks: PrintNumbers
, which writes numbers from 1 to 10 line by line to a file called number_up_to_10.txt
, and squares, which reads that file and pairs with square numbers line by line. It's a
Squared Numbers that writes to a file called .txt
.
To perform this task:
$ python run_luigi.py SquaredNumbers --local-scheduler
Luigi considers dependency checking between tasks and finds that there is no SquaredNumbers
input, so first run the PrintNumbers
task and then run the SquaredNumbers
.
The first argument passed to Luigi is the name of the last task in the pipeline you want to perform. The second argument simply tells Luigi to use the local scheduler (more on that later).
You can also use the luigi
command:
$ luigi -m run_luigi.py SquaredNumbers --local-scheduler
To create a Luigi task, simply create a class with luigi.Task
as the parent and override some methods. In particular:
--requires ()
is a list of dependent tasks
--ʻOutput () is the target of the task (eg LocalTarget, S3Target, etc.) --
run ()` is the execution logic
Is. Luigi checks the return values of requires ()
and ʻoutput ()` and builds a dependency graph accordingly.
Hard-coded filenames and settings are generally anti-patterns. Once you understand the structure and dynamics of a task, you should parameterize your settings so that you can dynamically call the same script with different arguments.
That is the luigi.Parameter ()
class. Each Luigi task can have several parameters. For example, let's say in the previous example that you can change the number. Since we are using integers as parameters for the range ()
function, we can use luigi.IntParameter
instead of the default parameter class. The modified task looks like this:
class PrintNumbers(luigi.Task):
n = luigi.IntParameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))
def run(self):
with self.output().open('w') as f:
for i in range(1, self.n+1):
f.write("{}\n".format(i))
class SquaredNumbers(luigi.Task):
n = luigi.IntParameter()
def requires(self):
return [PrintNumbers(n=self.n)]
def output(self):
return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
To raise the SquaredNumbers
task to 20 and call it:
$ python run_luigi.py SquaredNumbers --local-scheduler --n 20
Parameters can also have default values. For example:
n = luigi.IntParameter(default=10)
In this case, 10 is used unless the --n
argument is specified.
Earlier, I used the --local-scheduler
option when running Luigi's tasks on the local scheduler. This is useful for development, but for product environments you should use the centralized scheduler (see the documentation at scheduler).
This has several advantages:
--Avoid running two instances of the same task at the same time --Nice web-based visualization
To run the Luigi scheduler daemon in the foreground:
$ luigid
In the background:
$ luigid --background
It uses port 8082 by default, so you can see the visualization by accessing http: // localhost: 8082 / in your browser.
When the global Luigi scheduler is running, it can be rerun without options for the local scheduler:
$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]
The sample code finishes in milliseconds, but if you want to switch to the browser and see the dependency graph when the task is still running, you probably want a large number, say 10,000,000 or more, to the --n
option. You should give it.
The screenshot of the dependency graph is:
We discussed the definition of a data pipeline using Luigi, a workflow manager written in Python. Luigi provides a nice abstraction of the pipeline with tasks and targets, and also considers dependencies for you.
From a code reuse and mindset perspective of migrating from prototype to product, I have individual [Python packages] for business logic tasks (http://marcobonzanini.com/2015/07/01/how-to-develop) I find it useful to define it as -and-distribute-python-packages /) (that is, with the setup.py
file). This way, you can simply declare ʻimport your_package` from your Luigi script and call it from there.
It's possible for a task to produce multiple files as output, but if that happens, you should probably consider whether the task can be divided into smaller units (ie multiple tasks). Are their outputs logically the same? Are there any dependencies? If you can't split the task, I think it's simple and convenient to just make ʻoutput ()a log file that combines the name of the task itself, the timestamp, and so on. The name of the log file will be something like
TaskName_timestamp_param1value_param2value_etc`.
Workflow managers like Luigi handle dependencies, reduce the amount of code templates for parameter and error handling, manage failure recovery, and follow clear patterns when developing data pipelines. Generally speaking, it is useful because it will help you.
It is important to consider the limits as well:
--Lugi is developed for batch jobs, so it's probably useless for near real-time processing -Does not trigger execution. You need to run a data pipeline (eg through a cronjob)
Recommended Posts