The other day, I received a request to "embed the notebook used for data analysis into Pipeline as it is", but I couldn't find a package that could do that, so I decided to make it myself.
Even if I make it myself, I just added a little function to the existing Pipeline. This time, we adopted Kedro for Pipeline and Papermill to incorporate Notebook as it is.
I chose Kedro because the Pipeline configuration is simple (just write functions and I / O), and the documentation is extensive and the learning cost seems to be low. Learning costs are quite important in making suggestions for someone to use. Also, as Mr. Kinui says, the [^ kinuit-kedro] logo is cool.
There are three main features.
--Run Notebook from Kedro using Papermill --Define Pipeline in YAML --Version control of Notebook output by Papermill
The figure below is a visualization of Kedro's Hello World project with Kedro-Viz. The rectangle represents the function and the rounded rectangle represents the data. It is an image that each of these rectangles becomes a notebook.
Pipeline's YAML is written as follows: For example, the output of split_data
ʻexample_train_x is the input of
train_model`, which represents the Pipeline flow (arrow).
conf/base/pipelines.yml
# data_engineering pipeline
data_engineering:
# split_data node
split_data:
nb:
input_path: notebooks/data_engineering/split_data.ipynb
parameters:
test_data_ratio: 0.2
inputs:
- example_iris_data
outputs:
- example_train_x
- example_train_y
- example_test_x
- example_test_y
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
# predict node
predict:
nb:
input_path: notebooks/data_science/predict.ipynb
versioned: True
inputs:
- example_model
- example_test_x
outputs:
- example_predictions
# report_accuracy node
report_accuracy:
nb:
input_path: notebooks/data_science/report_accuracy.ipynb
versioned: True
inputs:
- example_predictions
- example_test_y
For example, if you write pipelines.yml
as follows, the output destination of Notebook will be data / 08_reporting / train_model # num_iter = 10000 & lr = 0.01.ipynb / <version> /train_model#num_iter=10000&lr=0.01.ipynb
. Will be. Where <version>
is a date and time string formatted with YYYY-MM-DDThh.mm.ss.sssZ
.
conf/base/pipelines.yml
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
I haven't been able to maintain it properly yet ... The general flow is as follows.
Create an environment from the template project with the following command.
$ git clone https://github.com/hrappuccino/kedro-notebook-project.git
$ cd kedro-notebook-project
$ pipenv install
$ pipenv shell
Register all the data that appears in Pipeline (including intermediate products) in the Data Catalog.
conf/base/catalog.yaml
example_iris_data:
type: pandas.CSVDataSet
filepath: data/01_raw/iris.csv
example_train_x:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_train_x.pkl
example_train_y:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_train_y.pkl
example_test_x:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_test_x.pkl
example_test_y:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_test_y.pkl
example_model:
type: pickle.PickleDataSet
filepath: data/06_models/example_model.pkl
example_predictions:
type: pickle.PickleDataSet
filepath: data/07_model_output/example_predictions.pkl
Please refer to Kedro Documents for how to write the Data Catalog.
Basically, you can make a notebook as usual, but only the following two are different from usual.
--Use Kedro's Data Catalog for data input / output --Parameterize for Papermill
Launch Jupyter Notebook / Lab from Kedro.
$ kedro jupyter notebook
$ kedro jupyter lab
Execute the following magic command in the Notebook. Now you can use a global variable called catalog
.
%reload_kedro
To read / save data, write as follows.
data = catalog.load('example_iris_data')
catalog.save('example_train_x', train_x)
In addition, how to operate Kedro with Jupyter is Kedro document Please refer to.
To parameterize a Notebook, tag the cells with parameters
.
Please refer to Papermill documentation for how to do this.
Write the Pipeline in YAML (reposted above) as follows:
conf/base/pipelines.yaml
# data_engineering pipeline
data_engineering:
# split_data node
split_data:
nb:
input_path: notebooks/data_engineering/split_data.ipynb
parameters:
test_data_ratio: 0.2
inputs:
- example_iris_data
outputs:
- example_train_x
- example_train_y
- example_test_x
- example_test_y
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
# predict node
predict:
nb:
input_path: notebooks/data_science/predict.ipynb
versioned: True
inputs:
- example_model
- example_test_x
outputs:
- example_predictions
# report_accuracy node
report_accuracy:
nb:
input_path: notebooks/data_science/report_accuracy.ipynb
versioned: True
inputs:
- example_predictions
- example_test_y
Run all / part of Pipeline.
$ kedro run
$ kedro run --pipeline=data_engineering
If you specify the --parallel
option, parallel processing will be performed where parallelization is possible.
$ kedro run --parallel
For more information on how to run Pipeline, please refer to Kedro's documentation.
Execute the following command to access http://127.0.0.1:4141/
and the page shown below will be displayed.
$ kedro viz
Execute the following command to access http://127.0.0.1:5000/
and the page shown below will be displayed.
$ mlflow ui
- Note: * Since I ran it in a Notebook, a single Experiment is recorded in two lines.
Kedro + MLflow is also introduced in Kedro's blog.
I will briefly explain how it works.
To be precise, I'm running Notebook using Papermill in a function.
At the extreme, all you have to do is run pm.execute_notebook
, but in order to separate the Notebook and Pipeline arguments, we make them into classes and receive them with __ init__
and __call__
. At first, I implemented it with closures, but I was angry that it could not be serialized when processing in parallel, so I made it a class.
__get_default_output_path
is a process for version control of Notebook output by Papermill, which will be described in detail later.
src/kedro_local/nodes/nodes.py
import papermill as pm
from pathlib import Path
import os, re, urllib, datetime
DEFAULT_VERSION = datetime.datetime.now().isoformat(timespec='milliseconds').replace(':', '.') + 'Z'
def _extract_dataset_name_from_log(output_text):
m = re.search('kedro.io.data_catalog - INFO - Saving data to `(\\w+)`', output_text)
return m.group(1) if m else None
class NotebookExecuter:
def __init__(self, catalog, input_path, output_path=None, parameters=None, versioned=False, version=DEFAULT_VERSION):
self.__catalog = catalog
self.__input_path = input_path
self.__parameters = parameters
self.__versioned = versioned
self.__version = version
self.__output_path = output_path or self.__get_default_output_path()
def __call__(self, *args):
nb = pm.execute_notebook(self.__input_path, self.__output_path, self.__parameters)
dataset_names = [
_extract_dataset_name_from_log(output['text'])
for cell in nb['cells'] if 'outputs' in cell
for output in cell['outputs'] if 'text' in output
]
return {dataset_name: self.__catalog.load(dataset_name) for dataset_name in dataset_names if dataset_name}
def __get_default_output_path(self):
#See below
Read the YAML mentioned above and create a Pipeline.
Basically, I'm just converting YAML to an object in a dictionary comprehension.
The last Pipeline of __default__
is executed when the --pipeline
option is omitted in kedro run
.
src/kedro_notebook_project/pipeline.py
from kedro.pipeline import Pipeline, node
from kedro_local.nodes import *
import yaml
def create_pipelines(catalog, **kwargs):
with open('conf/base/pipelines.yml') as f:
pipelines_ = yaml.safe_load(f)
pipelines = {
pipeline_name: Pipeline([
node(
NotebookExecuter(catalog, **node_['nb']),
node_['inputs'] if 'inputs' in node_ else None,
{output: output for output in node_['outputs']} if 'outputs' in node_ else None,
name=node_name,
) for node_name, node_ in nodes_.items()
]) for pipeline_name, nodes_ in pipelines_.items()
}
for pipeline_ in list(pipelines.values()):
if '__default__' not in pipelines:
pipelines['__default__'] = pipeline_
else:
pipelines['__default__'] += pipeline_
return pipelines
It just rewrites the output destination according to the definition of pipelines.yml
. Note that if self.__parameters
is large, the file name will be too long. It used to be hashed, but since it is not human-friendly, it is tentatively converted to a query string.
src/kedro_local/nodes/nodes.py
class NotebookExecuter:
#abridgement
def __get_default_output_path(self):
name, ext = os.path.splitext(os.path.basename(self.__input_path))
if self.__parameters:
name += '#' + urllib.parse.urlencode(self.__parameters)
name += ext
output_dir = Path(os.getenv('PAPERMILL_OUTPUT_DIR', ''))
if self.__versioned:
output_dir = output_dir / name / self.__version
output_dir.mkdir(parents=True, exist_ok=True)
return str(output_dir / name)
Thank you for reading this far. All the source code presented in this article can be found on My GitHub. If you are interested, please use it and give us your feedback.
Recommended Posts