Exécutez un pipeline de machine learning avec Cloud Dataflow (Python)

Cloud Dataflow est souvent utilisé dans les pipelines ETL sur GCP, mais pour rappel, j'ai essayé de l'utiliser non seulement pour le prétraitement, mais aussi pour l'exécution de pipeline, y compris l'apprentissage automatique.

Chose que tu veux faire

Je veux être en mesure d'exécuter facilement un grand nombre de scripts d'apprentissage / de prédiction qui ont été déplacés à petite échelle à portée de main lorsqu'une grande quantité d'essais et d'erreurs d'apprentissage est effectuée dans un environnement distribué. Il était gênant de modifier le script d'apprentissage / de prédiction que j'exécutais pour qu'il fonctionne à distance à chaque fois, et de configurer une machine et de la disperser. Je souhaite également pouvoir passer d'un prétraitement tel que la génération d'attributs à l'apprentissage et à l'évaluation en un seul passage. Si le code de la partie de prétraitement et d'apprentissage est divisé, le modèle de prédiction ne peut être reproduit que si les versions de code et de données intermédiaires sont soigneusement gérées, et s'il est implémenté sous forme de pipeline, il peut être incorporé dans le système. Parce que cela semble facile.

Installation

Installez les éléments suivants sur une machine en état de marche qui exécute le pipeline à portée de main ou soumet une tâche au cloud. (Notez que Python ne prend en charge que 2 systèmes pour le moment lors de l'exécution dans le cloud)

python


git clone https://github.com/apache/beam.git
cd beam/sdks/python/
python setup.py sdist
cd dist/
pip install apache-beam-sdk-*.tar.gz

python


pip install --upgrade google-cloud-dataflow

J'ai respectivement 0.6.0 et 0.5.5 installés dans mon environnement. Après cela, installez les bibliothèques telles que scicit-learn et pandas qui sont nécessaires pour s'exécuter dans votre environnement.

Exécutez le pipeline d'apprentissage / de prédiction scikit-learn

Examinons ici le pipeline d'apprentissage / prédiction hypothétique suivant à l'aide de pandas et de scicit-learn déjà installés dans l'environnement d'exécution de Dataflow.

Ici, les données sont créées à l'avance pour l'entraînement et l'évaluation, puis placées dans BigQuery. On suppose que les hyper-paramètres ont été décidés et que vous souhaitez évaluer un grand nombre de modèles de prédiction à la fois. On suppose que le modèle d'apprentissage sera réappris chaque année afin de faire face à la détérioration du modèle d'apprentissage au fil du temps.

L'explication continuera en utilisant les données acquises par la requête suivante comme exemple.

python


SELECT year,date,shop_id,sales,attr1,attr2,attr3
FROM dataset.table

On suppose que shop_id est la clé unique du magasin, sales est la variable objectif et attr1-3 est l'attribut.

option

Ci-dessous, nous allons entrer les éléments de réglage de Pipeline.

réglage des options


import apache_beam as beam
import apache_beam.transforms.window as window

options = beam.utils.pipeline_options.PipelineOptions()

google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{YOUR_PROJECT}'
google_cloud_options.job_name = 'sklearn'
google_cloud_options.staging_location = 'gs://{YOUR_BUCKET}/binaries'
google_cloud_options.temp_location = 'gs://{YOUR_BUCKET}/temp'

worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 10

#options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'

pipeline = beam.Pipeline(options=options)

Dans Google Cloud Options, nous décrirons les paramètres à exécuter sur GCP. Spécifiez l'emplacement du fichier exécutable ou du fichier temporaire avec staging_location ou temp_location.

Options du travailleur définit le travailleur. Par défaut, GCP déterminera automatiquement la configuration en fonction de la charge. (Le document de la version japonaise indique que Python n'est pas pris en charge, mais la version anglaise indique qu'il est pris en charge) Même lorsque la mise à l'échelle automatique est activée, vous pouvez limiter l'échelle en spécifiant le nombre maximal de nœuds de calcul avec max_num_worker.

Les options standard spécifient l'environnement dans lequel le pipeline s'exécute. Si DirectRunner est spécifié, il s'exécutera dans l'environnement concerné et si DataflowRunner est spécifié, il s'exécutera sur GCP. Il semble bon de vérifier le fonctionnement d'une petite charge de travail à portée de main et de l'exécuter sur le cloud s'il n'y a pas de problème.

Il existe de nombreux autres paramètres d'options, consultez l'aide de la ligne de commande et les Commentaires source Je peux le faire.

Définition du pipeline

Le pipeline est décrit en connectant chaque processus dans l'ordre avec un opérateur de tuyau.

Pipeline


(pipeline
 | "Query data"  >> beam.Read(beam.io.BigQuerySource(query=query))
 | "Assign time" >> beam.Map(assign_timevalue)
 | "Set window"  >> beam.WindowInto(window.SlidingWindows(size=3, period=1))
 | "Assign group key" >> beam.Map(lambda v: (v["shop_id"], v))
 | "Group by group key and time window" >> beam.GroupByKey()
 | "Learn and predict"  >> beam.FlatMap(learn_predict)
 | "Write predict data" >> beam.Write(beam.io.BigQuerySink('dataset.table',
                              schema="shop_id:STRING, date:STRING, predict:FLOAT, sales:INTEGER",
                              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)))

pipeline.run()

Lors de la première étape, les données sont lues en spécifiant la requête de BigQuery. La deuxième étape spécifie la valeur de référence qui détermine la plage de la fenêtre pour l'étape suivante. Cette fois, les données sont divisées par année, alors spécifiez la colonne qui indique l'année de chaque donnée (les détails seront décrits plus loin). La troisième étape spécifie la largeur et l'espacement des fenêtres. Cette fois, la largeur est de 3 ans (2 ans d'apprentissage, 1 an de prévision), et elle est décalée d'un an, donc définissez size = 3, period = 1. Les fenêtres coulissantes sont une fenêtre de glissement, mais il en existe de nombreuses autres telles que les fenêtres fixes pour la correction et les sessions pour les sessions. Dans la 4ème étape, l'ID de magasin est spécifié comme la clé que vous souhaitez spécifier pour le groupe. Il sera regroupé par la fenêtre et la clé (mémoriser) spécifiées précédemment à la 5ème étape. La sixième étape effectue un traitement d'apprentissage / prédiction pour chaque donnée groupée et renvoie le résultat de la prédiction. La raison de l'utilisation de FlatMap est que les données agrégées pour chaque fenêtre de magasin x sont redistribuées et renvoyées quotidiennement. À la 7e étape, le résultat de la prédiction quotidienne est enregistré dans BigQuery. Le pipeline est exécuté lorsque le pipeline de la dernière étape est exécuté.

Ensuite, jetons un œil à chaque fonction.

Une fonction qui renvoie une valeur pour fractionner une fenêtre


def assign_timevalue(v):

    import apache_beam.transforms.window as window

    return window.TimestampedValue(v, v["year"])

Pour spécifier la valeur à utiliser dans la fenêtre, remplacez la valeur par TimestampedValue. Le premier TimestampedValue est la valeur et le second est la valeur utilisée dans la fenêtre. La mise en garde ici est que vous devez spécifier l'importation pour référencer les packages et les modules dans la fonction. Il n'y a pas de problème si vous le déplacez à portée de main, mais sur le cloud, cette fonction est distribuée et exécutée par les nœuds worker. Vous devez importer le package pour qu'il fonctionne même dans l'environnement du nœud worker. Veuillez noter que les variables définies globalement ne sont pas accessibles sur le cloud.

Fonction de prédiction d'apprentissage


def learn_predict(records):

    import pandas as pd
    from sklearn.ensemble import GradientBoostingRegressor

    target_attr = 'sales'
    learn_attrs = ['attr1', 'attr2', 'attr3']

    data = pd.DataFrame(records[1])
    data[learn_attrs] = data[learn_attrs].apply(pd.to_numeric)
    data = data.fillna(0.)

    if len(data["year"].unique()) < 3:
        return [] #Ne rien faire pour les combinaisons de moins de 3 ans

    year_max = data["year"].max()
    train = data[data["year"] <  year_max] #Il y a 2 ans pour apprendre
    test  = data[data["year"] == year_max] #L'année prochaine est consacrée à l'évaluation des prévisions

    model = GradientBoostingRegressor()
    model.fit(train[learn_attrs], train[target_attr])
    test.loc[:, "predict"] = model.predict(test[learn_attrs])

    return test[["shop_id","date","predict","sales"]].to_dict(orient='records')

Étant donné que les données sont transmises à la fonction d'apprentissage / prédiction sous forme de taple avec la clé (ID de magasin) et la liste au format dictionnaire comme valeur, la liste de valeurs est convertie en une Dataframe pour l'apprentissage / la prédiction. Dans la dernière ligne, la conversion est effectuée pour transmettre le résultat sous forme de liste de formats de dictionnaire à l'insertion de BigQuery dans la dernière partie.

Lorsque le pipeline est exécuté de cette manière, le résultat de la prédiction et les données de réponse correctes sont saisis dans BigQuery, il est donc possible de calculer et d'évaluer des indicateurs tels que RMSE à partir de différentes perspectives telles que le magasin et l'année par SQL.

à la fin

Exécuter le processus d'apprentissage sur Dataflow peut être une mauvaise idée de l'objectif du service, mais j'ai pu le déplacer. Cette fois, il s'agissait d'un simple pipeline à sens unique qui apprend et prédit à partir des données créées dans BigQuery et enregistre le résultat, mais vous pouvez ajouter le traitement des données, etc., transmettre les données d'évaluation d'un autre flux et le résultat. Il semble qu'il puisse être modifié de manière flexible en branchant le résultat de la prédiction et le modèle de prédiction et en le transmettant à l'étape suivante. On a supposé que les hyper paramètres avaient déjà été décidés cette fois, mais je voudrais essayer l'exécution parallèle en masse du réglage des paramètres.

Cloud Dataflow est un service qui n'a pas reçu beaucoup d'attention dans GCP, mais personnellement, Dataflow gère la construction et le fonctionnement du flux de données, ce qui a tendance à être gênant pour les applications qui gèrent un traitement de données complexe tel que l'apprentissage automatique. S'attend à être comme AppEngine pour les applications d'analyse de données.

Cette fois, j'ai utilisé scicit-learn qui est installé en standard dans Dataflow, mais en réalité, vous voudrez utiliser diverses bibliothèques. La prochaine fois, j'aimerais décrire la procédure d'installation de toute bibliothèque en utilisant l'installation de xgboost comme exemple.

référence

Recommended Posts

Exécutez un pipeline de machine learning avec Cloud Dataflow (Python)
Exécutez XGBoost avec Cloud Dataflow (Python)
Créer un environnement d'apprentissage automatique Python avec des conteneurs
Apprentissage automatique avec Python! Préparation
Commencer avec l'apprentissage automatique Python
Créer un environnement de développement d'applications d'apprentissage automatique avec Python
Apprentissage automatique par python (1) Classification générale
Mémo d'apprentissage "Scraping & Machine Learning avec Python"
Jusqu'à ce que vous créiez un environnement d'apprentissage automatique avec Python sur Windows 7 et que vous l'exécutiez
Exécutez Cloud Dataflow (Python) depuis AppEngine
Amplifiez les images pour l'apprentissage automatique avec Python
Création d'un environnement Windows 7 pour une introduction à l'apprentissage automatique avec Python
Apprentissage automatique avec python (2) Analyse de régression simple
Une histoire sur l'apprentissage automatique avec Kyasuket
[Shakyo] Rencontre avec Python pour l'apprentissage automatique
Exécutez une application Web Python avec Docker
Construction d'environnement AI / Machine Learning avec Python
[Python] Introduction facile à l'apprentissage automatique avec python (SVM)
Apprentissage automatique à partir de Python Personal Memorandum Part2
Apprentissage automatique à partir de Python Personal Memorandum Part1
[Python] Collectez des images avec Icrawler pour l'apprentissage automatique [1000 feuilles]
Touchons une partie de l'apprentissage automatique avec Python
J'ai commencé l'apprentissage automatique avec le prétraitement des données Python
Apprendre Python avec ChemTHEATER 03
Exécutez Python avec VBA
Apprendre Python avec ChemTHEATER 05-1
Exécutez prepDE.py avec python3
Exécutez Blender avec python
Apprendre Python avec ChemTHEATER 02
Apprendre Python avec ChemTHEATER 01
Tutoriel Cloud Run (python)
Exécutez iperf avec python
Un débutant en apprentissage automatique a essayé de créer un modèle de prédiction de courses de chevaux avec python
Créer un environnement Python d'apprentissage automatique sur Mac OS
Sentons-nous comme un chercheur en matériaux avec l'apprentissage automatique
[Python] J'ai créé un classificateur pour les iris [Machine learning]
Résumé du flux de base de l'apprentissage automatique avec Python
Exécuter un fichier Python avec une importation relative dans PyCharm
Mémo de construction d'environnement d'apprentissage automatique par Python
Je veux faire fonctionner un ordinateur quantique avec Python
Créez un environnement d'apprentissage automatique à partir de zéro avec Winsows 10
MALSS (introduction), un outil qui prend en charge l'apprentissage automatique en Python
J'ai essayé de faire une simulation de séparation de source sonore en temps réel avec l'apprentissage automatique Python
Prédire le temps objectif d'un marathon complet avec l'apprentissage automatique-③: j'ai essayé de visualiser les données avec Python-
L'apprentissage automatique appris avec Pokemon
Exécutez python avec PyCharm (Windows)
Exécutez Python avec CloudFlash (arm926ej-s)
Apprentissage amélioré à partir de Python
Faites une loterie avec Python
Démineur d'apprentissage automatique avec PyTorch
Créer un environnement d'apprentissage automatique
Exécuter Label avec tkinter [Python]
Programmation Python Machine Learning> Mots-clés
Traitement itératif Python appris avec ChemoInfomatics
Créer un répertoire avec python
Essayez le machine learning à la légère avec Kaggle