Cloud Dataflow est souvent utilisé dans les pipelines ETL sur GCP, mais pour rappel, j'ai essayé de l'utiliser non seulement pour le prétraitement, mais aussi pour l'exécution de pipeline, y compris l'apprentissage automatique.
Je veux être en mesure d'exécuter facilement un grand nombre de scripts d'apprentissage / de prédiction qui ont été déplacés à petite échelle à portée de main lorsqu'une grande quantité d'essais et d'erreurs d'apprentissage est effectuée dans un environnement distribué. Il était gênant de modifier le script d'apprentissage / de prédiction que j'exécutais pour qu'il fonctionne à distance à chaque fois, et de configurer une machine et de la disperser. Je souhaite également pouvoir passer d'un prétraitement tel que la génération d'attributs à l'apprentissage et à l'évaluation en un seul passage. Si le code de la partie de prétraitement et d'apprentissage est divisé, le modèle de prédiction ne peut être reproduit que si les versions de code et de données intermédiaires sont soigneusement gérées, et s'il est implémenté sous forme de pipeline, il peut être incorporé dans le système. Parce que cela semble facile.
Installez les éléments suivants sur une machine en état de marche qui exécute le pipeline à portée de main ou soumet une tâche au cloud. (Notez que Python ne prend en charge que 2 systèmes pour le moment lors de l'exécution dans le cloud)
python
git clone https://github.com/apache/beam.git
cd beam/sdks/python/
python setup.py sdist
cd dist/
pip install apache-beam-sdk-*.tar.gz
python
pip install --upgrade google-cloud-dataflow
J'ai respectivement 0.6.0 et 0.5.5 installés dans mon environnement. Après cela, installez les bibliothèques telles que scicit-learn et pandas qui sont nécessaires pour s'exécuter dans votre environnement.
Examinons ici le pipeline d'apprentissage / prédiction hypothétique suivant à l'aide de pandas et de scicit-learn déjà installés dans l'environnement d'exécution de Dataflow.
Ici, les données sont créées à l'avance pour l'entraînement et l'évaluation, puis placées dans BigQuery. On suppose que les hyper-paramètres ont été décidés et que vous souhaitez évaluer un grand nombre de modèles de prédiction à la fois. On suppose que le modèle d'apprentissage sera réappris chaque année afin de faire face à la détérioration du modèle d'apprentissage au fil du temps.
L'explication continuera en utilisant les données acquises par la requête suivante comme exemple.
python
SELECT year,date,shop_id,sales,attr1,attr2,attr3
FROM dataset.table
On suppose que shop_id est la clé unique du magasin, sales est la variable objectif et attr1-3 est l'attribut.
Ci-dessous, nous allons entrer les éléments de réglage de Pipeline.
réglage des options
import apache_beam as beam
import apache_beam.transforms.window as window
options = beam.utils.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{YOUR_PROJECT}'
google_cloud_options.job_name = 'sklearn'
google_cloud_options.staging_location = 'gs://{YOUR_BUCKET}/binaries'
google_cloud_options.temp_location = 'gs://{YOUR_BUCKET}/temp'
worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 10
#options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'
pipeline = beam.Pipeline(options=options)
Dans Google Cloud Options, nous décrirons les paramètres à exécuter sur GCP. Spécifiez l'emplacement du fichier exécutable ou du fichier temporaire avec staging_location ou temp_location.
Options du travailleur définit le travailleur. Par défaut, GCP déterminera automatiquement la configuration en fonction de la charge. (Le document de la version japonaise indique que Python n'est pas pris en charge, mais la version anglaise indique qu'il est pris en charge) Même lorsque la mise à l'échelle automatique est activée, vous pouvez limiter l'échelle en spécifiant le nombre maximal de nœuds de calcul avec max_num_worker.
Les options standard spécifient l'environnement dans lequel le pipeline s'exécute. Si DirectRunner est spécifié, il s'exécutera dans l'environnement concerné et si DataflowRunner est spécifié, il s'exécutera sur GCP. Il semble bon de vérifier le fonctionnement d'une petite charge de travail à portée de main et de l'exécuter sur le cloud s'il n'y a pas de problème.
Il existe de nombreux autres paramètres d'options, consultez l'aide de la ligne de commande et les Commentaires source Je peux le faire.
Le pipeline est décrit en connectant chaque processus dans l'ordre avec un opérateur de tuyau.
Pipeline
(pipeline
| "Query data" >> beam.Read(beam.io.BigQuerySource(query=query))
| "Assign time" >> beam.Map(assign_timevalue)
| "Set window" >> beam.WindowInto(window.SlidingWindows(size=3, period=1))
| "Assign group key" >> beam.Map(lambda v: (v["shop_id"], v))
| "Group by group key and time window" >> beam.GroupByKey()
| "Learn and predict" >> beam.FlatMap(learn_predict)
| "Write predict data" >> beam.Write(beam.io.BigQuerySink('dataset.table',
schema="shop_id:STRING, date:STRING, predict:FLOAT, sales:INTEGER",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)))
pipeline.run()
Lors de la première étape, les données sont lues en spécifiant la requête de BigQuery. La deuxième étape spécifie la valeur de référence qui détermine la plage de la fenêtre pour l'étape suivante. Cette fois, les données sont divisées par année, alors spécifiez la colonne qui indique l'année de chaque donnée (les détails seront décrits plus loin). La troisième étape spécifie la largeur et l'espacement des fenêtres. Cette fois, la largeur est de 3 ans (2 ans d'apprentissage, 1 an de prévision), et elle est décalée d'un an, donc définissez size = 3, period = 1. Les fenêtres coulissantes sont une fenêtre de glissement, mais il en existe de nombreuses autres telles que les fenêtres fixes pour la correction et les sessions pour les sessions. Dans la 4ème étape, l'ID de magasin est spécifié comme la clé que vous souhaitez spécifier pour le groupe. Il sera regroupé par la fenêtre et la clé (mémoriser) spécifiées précédemment à la 5ème étape. La sixième étape effectue un traitement d'apprentissage / prédiction pour chaque donnée groupée et renvoie le résultat de la prédiction. La raison de l'utilisation de FlatMap est que les données agrégées pour chaque fenêtre de magasin x sont redistribuées et renvoyées quotidiennement. À la 7e étape, le résultat de la prédiction quotidienne est enregistré dans BigQuery. Le pipeline est exécuté lorsque le pipeline de la dernière étape est exécuté.
Ensuite, jetons un œil à chaque fonction.
Une fonction qui renvoie une valeur pour fractionner une fenêtre
def assign_timevalue(v):
import apache_beam.transforms.window as window
return window.TimestampedValue(v, v["year"])
Pour spécifier la valeur à utiliser dans la fenêtre, remplacez la valeur par TimestampedValue. Le premier TimestampedValue est la valeur et le second est la valeur utilisée dans la fenêtre. La mise en garde ici est que vous devez spécifier l'importation pour référencer les packages et les modules dans la fonction. Il n'y a pas de problème si vous le déplacez à portée de main, mais sur le cloud, cette fonction est distribuée et exécutée par les nœuds worker. Vous devez importer le package pour qu'il fonctionne même dans l'environnement du nœud worker. Veuillez noter que les variables définies globalement ne sont pas accessibles sur le cloud.
Fonction de prédiction d'apprentissage
def learn_predict(records):
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
target_attr = 'sales'
learn_attrs = ['attr1', 'attr2', 'attr3']
data = pd.DataFrame(records[1])
data[learn_attrs] = data[learn_attrs].apply(pd.to_numeric)
data = data.fillna(0.)
if len(data["year"].unique()) < 3:
return [] #Ne rien faire pour les combinaisons de moins de 3 ans
year_max = data["year"].max()
train = data[data["year"] < year_max] #Il y a 2 ans pour apprendre
test = data[data["year"] == year_max] #L'année prochaine est consacrée à l'évaluation des prévisions
model = GradientBoostingRegressor()
model.fit(train[learn_attrs], train[target_attr])
test.loc[:, "predict"] = model.predict(test[learn_attrs])
return test[["shop_id","date","predict","sales"]].to_dict(orient='records')
Étant donné que les données sont transmises à la fonction d'apprentissage / prédiction sous forme de taple avec la clé (ID de magasin) et la liste au format dictionnaire comme valeur, la liste de valeurs est convertie en une Dataframe pour l'apprentissage / la prédiction. Dans la dernière ligne, la conversion est effectuée pour transmettre le résultat sous forme de liste de formats de dictionnaire à l'insertion de BigQuery dans la dernière partie.
Lorsque le pipeline est exécuté de cette manière, le résultat de la prédiction et les données de réponse correctes sont saisis dans BigQuery, il est donc possible de calculer et d'évaluer des indicateurs tels que RMSE à partir de différentes perspectives telles que le magasin et l'année par SQL.
Exécuter le processus d'apprentissage sur Dataflow peut être une mauvaise idée de l'objectif du service, mais j'ai pu le déplacer. Cette fois, il s'agissait d'un simple pipeline à sens unique qui apprend et prédit à partir des données créées dans BigQuery et enregistre le résultat, mais vous pouvez ajouter le traitement des données, etc., transmettre les données d'évaluation d'un autre flux et le résultat. Il semble qu'il puisse être modifié de manière flexible en branchant le résultat de la prédiction et le modèle de prédiction et en le transmettant à l'étape suivante. On a supposé que les hyper paramètres avaient déjà été décidés cette fois, mais je voudrais essayer l'exécution parallèle en masse du réglage des paramètres.
Cloud Dataflow est un service qui n'a pas reçu beaucoup d'attention dans GCP, mais personnellement, Dataflow gère la construction et le fonctionnement du flux de données, ce qui a tendance à être gênant pour les applications qui gèrent un traitement de données complexe tel que l'apprentissage automatique. S'attend à être comme AppEngine pour les applications d'analyse de données.
Cette fois, j'ai utilisé scicit-learn qui est installé en standard dans Dataflow, mais en réalité, vous voudrez utiliser diverses bibliothèques. La prochaine fois, j'aimerais décrire la procédure d'installation de toute bibliothèque en utilisant l'installation de xgboost comme exemple.
Documentation officielle
La trilogie de technologie de traitement de données Google facile à comprendre de M. Nakai
Plateforme de traitement de données distribuée FlumeJava avec MapReduce comme backend
Principes de base du traitement en continu appris de "Mill Wheel"
Design pattern of streaming processing réalisé par "Cloud Dataflow"
Code associé
Exemple de traitement distribué de TensorFlow Prediction avec Cloud Dataflow
Classification d'image spéciale par Cloud Machine Learning et Cloud Dataflow
Recommended Posts