Nous orchestrerons une série de tâches lors de l'exécution du machine learning suivant à l'aide de Cloud Composer de Google Cloud Platform.
Le nœud et le workflow sur Airflow à créer sont indiqués dans la figure ci-dessous.
Les tâches Airflow ci-dessus peuvent être exprimées par les services GCP comme suit.
La commande bash ci-dessous fait trois choses.
Une chose à laquelle il faut faire attention lors de la création de l'environnement Cloud Composer est de spécifier --python-version 3
comme argument. Par défaut, la série python2 est définie.
Dans la liste des tâches affichée au début, il y avait un endroit pour publier un message dans Slack. Vous devez installer la bibliothèque slackclient
dans Airflow pour effectuer cette tâche.
Spécifiez le fichier de configuration de la bibliothèque dans l'argument --update-pypi-packages-from-file
.
requirements.txt
slackclient~=1.3.2
Comme mentionné ci-dessus, ʻacccess_token est requis lors de l'envoi d'un message à slack en utilisant la bibliothèque slackclient, il est donc pratique de définir ʻaccess_token
dans la variable d'environnement de flux d'air, donc définissez-le à l'avance. (Ce n'est pas si bon de solidifier ʻaccess_token` dans le fichier dag)
#!/usr/bin/env bash
ENVIRONMENT_NAME=dev-composer
LOCATION=us-central1
#Lire les variables
eval `cat airflow/config/.secrets.conf`
echo ${slack_access_token}
#Créer un environnement pour Cloud Composer
gcloud composer environments create ${ENVIRONMENT_NAME} \
--location ${LOCATION} \
--python-version 3
#Installez la bibliothèque dans l'environnement de flux d'air
gcloud composer environments update ${ENVIRONMENT_NAME} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${LOCATION}
#Définir les variables d'environnement sur le flux d'air
gcloud composer environments run \
--location=${LOCATION} \
${ENVIRONMENT_NAME} \
variables -- \
--set slack_access_token ${slack_access_token} project_id ${project_id}
Le fichier dag créé cette fois est le suivant. Cela ne suffit pas à expliquer, je vais donc l'expliquer en séparant le code pour chaque tâche.
import os
import airflow
import datetime
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow import configuration
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.operators.mlengine_operator \
import MLEngineTrainingOperator, MLEngineBatchPredictionOperator
BUCKET = 'gs://your_bucket'
PROJECT_ID = Variable.get('project_id')
REGION = 'us-central1'
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
DATAFLOW_TRAIN_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_train_data.py')
DATAFLOW_PRED_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_pred_data.py')
DATAFLOW_LOAD_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'load.py')
DEFAULT_ARGS = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
'project_id': Variable.get('project_id'),
'dataflow_default_options': {
'project': Variable.get('project_id'),
'temp_location': 'gs://your_composer_bucket/temp',
'runner': 'DataflowRunner'
}
}
def get_date():
jst_now = datetime.datetime.now()
dt = datetime.datetime.strftime(jst_now, "%Y-%m-%d")
return dt
with airflow.DAG(
'asl_ml_pipeline',
'catchup=False',
default_args=DEFAULT_ARGS,
schedule_interval=datetime.timedelta(days=1)) as dag:
start = DummyOperator(task_id='start')
####
#Tâches de formation sur ML Engine
####
job_id = 'dev-train-{}'.\
format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
submit_train_job = MLEngineTrainingOperator(
task_id='train-model',
project_id=PROJECT_ID,
job_id=job_id,
package_uris=[PACKAGE_URI],
region=REGION,
training_python_module='trainer.task',
training_args=[f'--output_dir={OUTDIR}',
f'--job_dir={job_dir}',
'--dropout_rate=0.5',
'--batch_size=128',
'--train_step=1'
],
scale_tier='BASIC_GPU',
python_version='3.5'
)
####
#Tâche de déploiement du modèle
####
BASE_VERSION_NAME = 'v1_0'
VERSION_NAME = '{0}_{1}'.\
format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
MODEL_NAME = 'dev_train'
deploy_model = BashOperator(
task_id='deploy-model',
bash_command='gcloud ml-engine versions create '
'{{ params.version_name}} '
'--model {{ params.model_name }} '
'--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
'--python-version="3.5" '
'--runtime-version=1.14 ',
params={'version_name': VERSION_NAME,
'model_name': MODEL_NAME}
)
####
#Tâche de prédiction par lots avec ML Engine
####
today = get_date()
input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
output_path = BUCKET + f'/result/{today}/'
batch_prediction = MLEngineBatchPredictionOperator(
task_id='batch-prediction',
data_format='TEXT',
region=REGION,
job_id=job_id,
input_paths=input_path,
output_path=output_path,
model_name=MODEL_NAME,
version_name=VERSION_NAME
)
####
#Tâche d'extraction de données avec DataFlow
####
job_args = {
'output': 'gs://your_bucket/preprocess'
}
create_train_data = DataFlowPythonOperator(
task_id='create-train-data',
py_file=DATAFLOW_TRAIN_FILE,
options=job_args
)
create_pred_data = DataFlowPythonOperator(
task_id='create-pred-data',
py_file=DATAFLOW_PRED_FILE,
options=job_args
)
####
#Tâche de chargement de données dans BigQuery avec DataFlow
####
load_results = DataFlowPythonOperator(
task_id='load_pred_results',
py_file=DATAFLOW_LOAD_FILE
)
post_success_slack_train = SlackAPIPostOperator(
task_id='post-success-train-to-slack',
token=Variable.get('slack_access_token'),
text='Train is succeeded',
channel='#feed'
)
post_fail_slack_train = SlackAPIPostOperator(
task_id='post-fail-train-to-slack',
token=Variable.get('slack_access_token'),
trigger_rule=TriggerRule.ONE_FAILED,
text='Train is failed',
channel='#feed'
)
####
#Tâche pour POSTER un message à Slack
####
post_success_slack_pred = SlackAPIPostOperator(
task_id='post-success-pred-to-slack',
token=Variable.get('slack_access_token'),
text='Prediction is succeeded',
channel='#feed'
)
post_fail_slack_pred = SlackAPIPostOperator(
task_id='post-fail-pred-to-slack',
token=Variable.get('slack_access_token'),
trigger_rule=TriggerRule.ONE_FAILED,
text='Prediction is failed',
channel='#feed'
)
end = DummyOperator(task_id='end')
start >> [create_train_data, create_pred_data] >> submit_train_job \
>> [post_fail_slack_train, post_success_slack_train]
post_fail_slack_train >> end
post_success_slack_train >> deploy_model >> batch_prediction \
>> load_results \
>> [post_success_slack_pred, post_fail_slack_pred] >> end
C'est la première tâche que nous effectuons dans la phase de formation. (Cadre rouge dans la figure ci-dessous) Les données sont extraites de BigQuery à l'aide de DataFlow et placées dans un bucket approprié de GCS.
--Constant DATAFLOW_TRAIN_FILE
/ DATAFLOW_PRED_FILE
dags
dans le Bucket
créés lors de la création de l'environnement Cloud Composer sont synchronisés par l'agent de flux d'air toutes les quelques secondes.--Constant DEFAULT_ARGS
--Définissez les variables d'environnement lors de l'exécution de DataFlow dans l'argument de dataflow_default_options
.
DataFlowPythonOperator
--Classe pour exécuter JOB en utilisant DataFlow comme fichier pythonpy_file
a le chemin où se trouve le fichier exécutable.
--ʻOptions` Spécifiez l'argument à passer au fichier d'exécution dans l'argument
--Cette fois, le chemin du fichier pour placer les données dans GCS est spécifié.
DATAFLOW_TRAIN_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_train_data.py')
DATAFLOW_PRED_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_pred_data.py')
DEFAULT_ARGS = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
'project_id': Variable.get('project_id'),
'dataflow_default_options': {
'project': Variable.get('project_id'),
'temp_location': 'gs://your_composer_bucket/temp',
'runner': 'DataflowRunner'
}
}
####
#Tâche d'extraction de données avec DataFlow
####
#Chemin du fichier lors de la mise des données dans GCS
job_args = {
'output': 'gs://your_bucket/preprocess'
}
create_train_data = DataFlowPythonOperator(
task_id='create-train-data',
py_file=DATAFLOW_TRAIN_FILE,
options=job_args
)
create_pred_data = DataFlowPythonOperator(
task_id='create-pred-data',
py_file=DATAFLOW_PRED_FILE,
options=job_args
)
Le fichier suivant est un processus à diviser en «données de train» et «données de test» et à les mettre dans GCS pour la formation. (Aussi, dans un souci de simplification de l'article, je vais omettre l'explication de la requête pour diviser en "données de train" et "données de test". Le hachage est effectué en supposant qu'il existe une colonne d'horodatage, et le reste après la division (Divisé par la valeur de)
Il y a deux points à prendre en compte ici.
--Convertir en fichier CSV
to_csv
.--Version Python de DataFlow
f-strings
ajoutée à partir de 3.6 ne peut pas être utilisée.import os
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import \
PipelineOptions
PROJECT = 'your_project_id'
def create_query(phase):
base_query = """
SELECT
*,
MOD(ABS(FARM_FINGERPRINT(CAST(timestamp AS STRING))), 10) AS hash_value
FROM
`dataset.your_table`
"""
if phase == 'TRAIN':
subsumple = """
hash_value < 7
"""
elif phase == 'TEST':
subsumple = """
hash_value >= 7
"""
query = """
SELECT
column1,
column2,
column3,
row_number()over() as key
FROM
({0})
WHERE {1}
""".\
format(base_query, subsumple)
return query
def to_csv(line):
csv_columns = 'column1,column2,column3,key'.split(',')
rowstring = ','.join([str(line[k]) for k in csv_columns])
return rowstring
def get_date():
jst_now = datetime.now()
dt = datetime.strftime(jst_now, "%Y-%m-%d")
return dt
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
required=True
)
known_args, pipeline_args = \
parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=options) as p:
for phase in ['TRAIN', 'TEST']:
query = create_query(phase)
date = get_date()
output_path = os.path.join(known_args.output, date,
'train', "{}".format(phase))
read = p | 'ExtractFromBigQuery_{}'.format(phase) >> beam.io.Read(
beam.io.BigQuerySource(
project=PROJECT,
query=query,
use_standard_sql=True
)
)
convert = read | 'ConvertToCSV_{}'.format(phase) >> beam.Map(to_csv)
convert | 'WriteToGCS_{}'.format(phase) >> beam.io.Write(
beam.io.WriteToText(output_path, file_name_suffix='.csv'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Ici, nous n'expliquerons pas les tâches d'entraînement à l'aide de ML Engine et de déploiement du modèle entraîné. De plus, cette fois, j'omettrai l'explication du fichier d'exécution «task.py» et du fichier modèle «model.py» utilisé dans ML Engine.
Nous préparons cette fois un seau pour ML Engine. Par conséquent, veuillez noter qu'un total de deux buckets est utilisé correctement lorsqu'ils sont combinés avec le bucket créé lors de la création de l'environnement Cloud Composer.
.
├── your_bucket
│ ├── code //Modèle requis pour l'apprentissage.py et tâche.Mettez un fichier gz avec py etc.
│ │
│ └── trainde_model //Le fichier de modèle entraîné est placé
│
└── your_composer_bucket //Bucket créé lors de la création de l'environnement Cloud Composer
--Constant PACKAGE_URI
Chemin du fichier où se trouve le fichier exécutable de ML Engine
Cette fois, le fichier gz contenant trainer.py
et model.py
est placé sous le gs: // your_bucket / code
ci-dessus.
--Cette constante est spécifiée dans l'argument package_uris
de la classe MLEngineTrainingOperator
.
ClasseBashOperator
--Classe pour exécuter la commande bash
Cette fois, la commande bash est utilisée lors du déploiement du fichier de modèle entraîné.
Le modèle est déployé en exécutant gcloud ml-engine versions create
.
(Probablement) Vous pouvez faire la même chose avec la classe MLEngineVersionOperator
, mais cette fois j'ai utilisé la commande bash.
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
job_id = 'dev-train-{}'.\
format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
submit_train_job = MLEngineTrainingOperator(
task_id='train-model',
project_id=PROJECT_ID,
job_id=job_id,
package_uris=[PACKAGE_URI],
region=REGION,
training_python_module='trainer.task',
training_args=[f'--output_dir={OUTDIR}',
f'--job_dir={job_dir}',
'--dropout_rate=0.5',
'--batch_size=128',
'--train_step=1'
],
scale_tier='BASIC_GPU',
python_version='3.5'
)
today = get_date()
BASE_VERSION_NAME = 'v1_0'
VERSION_NAME = '{0}_{1}'.\
format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
MODEL_NAME = 'dev_model'
deploy_model = BashOperator(
task_id='deploy-model',
bash_command='gcloud ml-engine versions create '
'{{ params.version_name}} '
'--model {{ params.model_name }} '
'--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
'--python-version="3.5" '
'--runtime-version=1.14 ',
params={'version_name': VERSION_NAME,
'model_name': MODEL_NAME}
)
Ici, nous allons expliquer la tâche de faire une prédiction par lots en utilisant le modèle déployé précédemment.
--Constant ʻinput_path`
--Constant ʻoutput_path`
input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
output_path = BUCKET + f'/result/{today}/'
batch_prediction = MLEngineBatchPredictionOperator(
task_id='batch-prediction',
data_format='TEXT',
region=REGION,
job_id=job_id,
input_paths=input_path,
output_path=output_path,
model_name=MODEL_NAME,
version_name=VERSION_NAME
)
Cette section décrit la tâche de chargement du résultat de prédiction prévu précédemment dans BigQuery à l'aide de DataFlow.
Pour les fichiers Dag, elle est similaire à la tâche "Extraire les données de BigQuery" décrite au début.
La constante DATAFLOW_LOAD_FILE
spécifie le chemin du fichier GCS où se trouve le fichier exécutable DataFlow.
DATAFLOW_LOAD_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'load.py')
load_results = DataFlowPythonOperator(
task_id='load_pred_results',
py_file=DATAFLOW_LOAD_FILE
)
Dans le fichier suivant, le fichier placé dans GCS est lu, converti au format Json et chargé dans une table appropriée de BigQuery. À quoi vous devez faire attention ici
{"key": [0], "prediction: [3.45...]"}
import logging
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
BUCKET_NAME = 'your_bucket'
INPUT = 'gs://{}/result/prediction.results-*'.format(BUCKET_NAME)
def convert(line):
import json
record = json.loads(line)
return {'key': record['key'][0], 'predictions': record['predictions'][0]}
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = \
parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=options) as p:
dataset = 'your_dataset.results'
read = p | 'ReadPredictionResult' >> beam.io.ReadFromText(INPUT)
json = read | 'ConvertJson' >> beam.Map(convert)
json | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
dataset,
schema='key:INTEGER, predictions:FLOAT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
J'utilise généralement AWS, mais depuis que j'ai eu l'occasion de parler des services GCP, j'ai résumé les services liés au ML. J'espère que cela sera utile pour ceux qui s'inquiètent du flux de travail ML.
Il existe un endroit pour déployer le modèle formé avec ML Engine tel quel, mais ce n'est pas recommandé. Nous vous recommandons de créer un mécanisme pour mesurer / comparer la précision du modèle que vous avez appris et de le déployer après avoir pris en sandwich la tâche.
Recommended Posts