L'autre jour, j'ai reçu une demande "d'intégrer le bloc-notes utilisé pour l'analyse des données dans Pipeline tel quel", mais je n'ai pas trouvé de package capable de le faire, j'ai donc décidé de le créer moi-même.
Même si je le fais moi-même, je viens d'ajouter une petite fonction au Pipeline existant. Cette fois, nous avons adopté Kedro pour Pipeline et Papermill pour incorporer Notebook tel quel.
J'ai choisi Kedro parce que Pipeline a une structure simple (il suffit d'écrire des fonctions et des entrées / sorties), et la documentation est complète et le coût d'apprentissage semble faible. Les coûts d'apprentissage sont assez importants pour faire des suggestions à quelqu'un. De plus, comme le dit M. Kinuit, le logo [^ kinuit-kedro] est cool.
Il existe trois caractéristiques principales.
La figure ci-dessous est une visualisation du projet Hello World de Kedro avec Kedro-Viz, où le rectangle représente la fonction et le carré arrondi représente les données. C'est une image que chacun de ces rectangles devient un cahier.
Le YAML de Pipeline s'écrit comme suit. Par exemple, la sortie de split_data
ʻexample_train_x est l'entrée de
train_model`, qui représente le flux (flèche) de Pipeline.
conf/base/pipelines.yml
# data_engineering pipeline
data_engineering:
# split_data node
split_data:
nb:
input_path: notebooks/data_engineering/split_data.ipynb
parameters:
test_data_ratio: 0.2
inputs:
- example_iris_data
outputs:
- example_train_x
- example_train_y
- example_test_x
- example_test_y
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
# predict node
predict:
nb:
input_path: notebooks/data_science/predict.ipynb
versioned: True
inputs:
- example_model
- example_test_x
outputs:
- example_predictions
# report_accuracy node
report_accuracy:
nb:
input_path: notebooks/data_science/report_accuracy.ipynb
versioned: True
inputs:
- example_predictions
- example_test_y
Par exemple, si vous écrivez pipelines.yml
comme suit, la destination de sortie de Notebook sera data / 08_reporting / train_model # num_iter = 10000 & lr = 0.01.ipynb / <version> / train_model # num_iter = 10000 & lr = 0.01.ipynb
. Sera. Où «
conf/base/pipelines.yml
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
Je n'ai pas encore pu le maintenir correctement ... Le flux général est le suivant.
Créez un environnement à partir du projet modèle avec la commande suivante.
$ git clone https://github.com/hrappuccino/kedro-notebook-project.git
$ cd kedro-notebook-project
$ pipenv install
$ pipenv shell
Enregistrez toutes les données qui apparaissent dans Pipeline (y compris les produits intermédiaires) dans le catalogue de données.
conf/base/catalog.yaml
example_iris_data:
type: pandas.CSVDataSet
filepath: data/01_raw/iris.csv
example_train_x:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_train_x.pkl
example_train_y:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_train_y.pkl
example_test_x:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_test_x.pkl
example_test_y:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_test_y.pkl
example_model:
type: pickle.PickleDataSet
filepath: data/06_models/example_model.pkl
example_predictions:
type: pickle.PickleDataSet
filepath: data/07_model_output/example_predictions.pkl
Veuillez vous référer aux Documents de Kedro pour savoir comment écrire un catalogue de données.
En gros, vous pouvez créer un cahier comme d'habitude, mais seuls les deux suivants sont différents de l'ordinaire.
--Utilisez le catalogue de données de Kedro pour l'entrée et la sortie de données --Parameterize for Papermill
Lancez Jupyter Notebook / Lab depuis Kedro.
$ kedro jupyter notebook
$ kedro jupyter lab
Exécutez la commande magique suivante dans le bloc-notes. Vous pouvez maintenant utiliser la variable globale catalog
.
%reload_kedro
Pour lire / enregistrer des données, écrivez comme suit.
data = catalog.load('example_iris_data')
catalog.save('example_train_x', train_x)
De plus, comment faire fonctionner Kedro avec Jupyter est Kedro Documents Prière de se référer à.
Pour paramétrer un notebook, marquez la cellule avec parameters
.
Veuillez vous référer à la documentation du moulin à papier pour savoir comment procéder.
Écrivez Pipeline dans le YAML suivant (republié ci-dessus).
conf/base/pipelines.yaml
# data_engineering pipeline
data_engineering:
# split_data node
split_data:
nb:
input_path: notebooks/data_engineering/split_data.ipynb
parameters:
test_data_ratio: 0.2
inputs:
- example_iris_data
outputs:
- example_train_x
- example_train_y
- example_test_x
- example_test_y
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
# predict node
predict:
nb:
input_path: notebooks/data_science/predict.ipynb
versioned: True
inputs:
- example_model
- example_test_x
outputs:
- example_predictions
# report_accuracy node
report_accuracy:
nb:
input_path: notebooks/data_science/report_accuracy.ipynb
versioned: True
inputs:
- example_predictions
- example_test_y
Exécutez tout / partie de Pipeline.
$ kedro run
$ kedro run --pipeline=data_engineering
Si vous spécifiez l'option --parallel
, il traitera les parties qui peuvent être parallélisées en parallèle.
$ kedro run --parallel
Pour d'autres instructions sur la façon d'exécuter Pipeline, veuillez consulter la documentation de Kedro.
Exécutez la commande suivante pour accéder à http: //127.0.0.1: 4141 /
et la page ci-dessous s'affichera.
$ kedro viz
Exécutez la commande suivante pour accéder à http: //127.0.0.1: 5000 /
et la page ci-dessous s'affichera.
$ mlflow ui
- Remarque: * Depuis que je l'ai exécuté sur un ordinateur portable, une seule expérience est enregistrée en deux lignes.
Kedro + MLflow est également présenté dans le Blog de Kedro.
Je vais expliquer brièvement comment cela fonctionne.
Pour être précis, j'exécute Notebook en utilisant Papermill dans une fonction.
À l'extrême, tout ce que vous avez à faire est d'exécuter pm.execute_notebook
, mais pour séparer les arguments Notebook et Pipeline, nous les transformons en classes et les recevons avec __init __
et __call__
. Au début, je l'ai implémenté avec une fermeture, mais j'étais en colère qu'il ne puisse pas être sérialisé lors du traitement en parallèle, alors j'en ai fait une classe.
__get_default_output_path
est un processus de gestion de version de la sortie Notebook par Papermill, qui sera décrit en détail plus tard.
src/kedro_local/nodes/nodes.py
import papermill as pm
from pathlib import Path
import os, re, urllib, datetime
DEFAULT_VERSION = datetime.datetime.now().isoformat(timespec='milliseconds').replace(':', '.') + 'Z'
def _extract_dataset_name_from_log(output_text):
m = re.search('kedro.io.data_catalog - INFO - Saving data to `(\\w+)`', output_text)
return m.group(1) if m else None
class NotebookExecuter:
def __init__(self, catalog, input_path, output_path=None, parameters=None, versioned=False, version=DEFAULT_VERSION):
self.__catalog = catalog
self.__input_path = input_path
self.__parameters = parameters
self.__versioned = versioned
self.__version = version
self.__output_path = output_path or self.__get_default_output_path()
def __call__(self, *args):
nb = pm.execute_notebook(self.__input_path, self.__output_path, self.__parameters)
dataset_names = [
_extract_dataset_name_from_log(output['text'])
for cell in nb['cells'] if 'outputs' in cell
for output in cell['outputs'] if 'text' in output
]
return {dataset_name: self.__catalog.load(dataset_name) for dataset_name in dataset_names if dataset_name}
def __get_default_output_path(self):
#Voir ci-dessous
Lisez le YAML ci-dessus et créez un pipeline.
Fondamentalement, je convertis simplement YAML en un objet en notation d'inclusion de dictionnaire.
Le dernier Pipeline de __default__
est exécuté lorsque l'option --pipeline
est omise dans kedro run
.
src/kedro_notebook_project/pipeline.py
from kedro.pipeline import Pipeline, node
from kedro_local.nodes import *
import yaml
def create_pipelines(catalog, **kwargs):
with open('conf/base/pipelines.yml') as f:
pipelines_ = yaml.safe_load(f)
pipelines = {
pipeline_name: Pipeline([
node(
NotebookExecuter(catalog, **node_['nb']),
node_['inputs'] if 'inputs' in node_ else None,
{output: output for output in node_['outputs']} if 'outputs' in node_ else None,
name=node_name,
) for node_name, node_ in nodes_.items()
]) for pipeline_name, nodes_ in pipelines_.items()
}
for pipeline_ in list(pipelines.values()):
if '__default__' not in pipelines:
pipelines['__default__'] = pipeline_
else:
pipelines['__default__'] += pipeline_
return pipelines
Il réécrit simplement la destination de sortie selon la définition de pipelines.yml
. Notez que si «self .__ parameters» est volumineux, le nom du fichier sera trop long. Il était auparavant haché, mais comme il n'est pas convivial, il est provisoirement converti en chaîne de requête.
src/kedro_local/nodes/nodes.py
class NotebookExecuter:
#réduction
def __get_default_output_path(self):
name, ext = os.path.splitext(os.path.basename(self.__input_path))
if self.__parameters:
name += '#' + urllib.parse.urlencode(self.__parameters)
name += ext
output_dir = Path(os.getenv('PAPERMILL_OUTPUT_DIR', ''))
if self.__versioned:
output_dir = output_dir / name / self.__version
output_dir.mkdir(parents=True, exist_ok=True)
return str(output_dir / name)
Merci d'avoir lu jusqu'ici. Tout le code source présenté dans cet article se trouve sur My GitHub. Si vous êtes intéressé, veuillez l'utiliser et nous faire part de vos commentaires.
Recommended Posts