We will orchestrate a series of tasks when performing the following machine learning using Cloud Composer of Google Cloud Platform.
The node and workflow on Airflow to be created are as shown in the figure below.
--Those who have touched Cloud Composer / Air Flow --Those who have touched ML Engine / DataFlow
The above Airflow tasks can be expressed by GCP services as follows.
The bash command below does three things.
One thing to be careful about when building the Cloud Composer environment is to specify --python-version 3
as an argument. By default, python2 series is set.
In the task list shown at the beginning, there was a place to post a message in Slack. You need to install the slackclient
library in airflow to perform this task.
Specify the library configuration file in the --update-pypi-packages-from-file
argument.
requirements.txt
slackclient~=1.3.2
As mentioned above, ʻacccess_token is required when posting a message to slack using the slackclient library, so it is convenient to set ʻaccess_token
in the environment variable of airflow, so set it in advance. (It is not so good to solidify ʻaccess_token` in the dag file)
#!/usr/bin/env bash
ENVIRONMENT_NAME=dev-composer
LOCATION=us-central1
#Read variables
eval `cat airflow/config/.secrets.conf`
echo ${slack_access_token}
#Creating an environment for cloud composer
gcloud composer environments create ${ENVIRONMENT_NAME} \
--location ${LOCATION} \
--python-version 3
#Install the library in the airflow environment
gcloud composer environments update ${ENVIRONMENT_NAME} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${LOCATION}
#Set environment variables on airflow
gcloud composer environments run \
--location=${LOCATION} \
${ENVIRONMENT_NAME} \
variables -- \
--set slack_access_token ${slack_access_token} project_id ${project_id}
The dag file created this time is as follows. This is not enough to explain, so I will explain by separating the code for each task.
import os
import airflow
import datetime
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow import configuration
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.operators.mlengine_operator \
import MLEngineTrainingOperator, MLEngineBatchPredictionOperator
BUCKET = 'gs://your_bucket'
PROJECT_ID = Variable.get('project_id')
REGION = 'us-central1'
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
DATAFLOW_TRAIN_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_train_data.py')
DATAFLOW_PRED_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_pred_data.py')
DATAFLOW_LOAD_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'load.py')
DEFAULT_ARGS = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
'project_id': Variable.get('project_id'),
'dataflow_default_options': {
'project': Variable.get('project_id'),
'temp_location': 'gs://your_composer_bucket/temp',
'runner': 'DataflowRunner'
}
}
def get_date():
jst_now = datetime.datetime.now()
dt = datetime.datetime.strftime(jst_now, "%Y-%m-%d")
return dt
with airflow.DAG(
'asl_ml_pipeline',
'catchup=False',
default_args=DEFAULT_ARGS,
schedule_interval=datetime.timedelta(days=1)) as dag:
start = DummyOperator(task_id='start')
####
#Tasks for training in ML Engine
####
job_id = 'dev-train-{}'.\
format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
submit_train_job = MLEngineTrainingOperator(
task_id='train-model',
project_id=PROJECT_ID,
job_id=job_id,
package_uris=[PACKAGE_URI],
region=REGION,
training_python_module='trainer.task',
training_args=[f'--output_dir={OUTDIR}',
f'--job_dir={job_dir}',
'--dropout_rate=0.5',
'--batch_size=128',
'--train_step=1'
],
scale_tier='BASIC_GPU',
python_version='3.5'
)
####
#Task to deploy model
####
BASE_VERSION_NAME = 'v1_0'
VERSION_NAME = '{0}_{1}'.\
format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
MODEL_NAME = 'dev_train'
deploy_model = BashOperator(
task_id='deploy-model',
bash_command='gcloud ml-engine versions create '
'{{ params.version_name}} '
'--model {{ params.model_name }} '
'--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
'--python-version="3.5" '
'--runtime-version=1.14 ',
params={'version_name': VERSION_NAME,
'model_name': MODEL_NAME}
)
####
#Tasks for batch prediction in ML Engine
####
today = get_date()
input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
output_path = BUCKET + f'/result/{today}/'
batch_prediction = MLEngineBatchPredictionOperator(
task_id='batch-prediction',
data_format='TEXT',
region=REGION,
job_id=job_id,
input_paths=input_path,
output_path=output_path,
model_name=MODEL_NAME,
version_name=VERSION_NAME
)
####
#Task to extract data with DataFlow
####
job_args = {
'output': 'gs://your_bucket/preprocess'
}
create_train_data = DataFlowPythonOperator(
task_id='create-train-data',
py_file=DATAFLOW_TRAIN_FILE,
options=job_args
)
create_pred_data = DataFlowPythonOperator(
task_id='create-pred-data',
py_file=DATAFLOW_PRED_FILE,
options=job_args
)
####
#Task to load data into BigQuery with DataFlow
####
load_results = DataFlowPythonOperator(
task_id='load_pred_results',
py_file=DATAFLOW_LOAD_FILE
)
post_success_slack_train = SlackAPIPostOperator(
task_id='post-success-train-to-slack',
token=Variable.get('slack_access_token'),
text='Train is succeeded',
channel='#feed'
)
post_fail_slack_train = SlackAPIPostOperator(
task_id='post-fail-train-to-slack',
token=Variable.get('slack_access_token'),
trigger_rule=TriggerRule.ONE_FAILED,
text='Train is failed',
channel='#feed'
)
####
#Task to POST a message to Slack
####
post_success_slack_pred = SlackAPIPostOperator(
task_id='post-success-pred-to-slack',
token=Variable.get('slack_access_token'),
text='Prediction is succeeded',
channel='#feed'
)
post_fail_slack_pred = SlackAPIPostOperator(
task_id='post-fail-pred-to-slack',
token=Variable.get('slack_access_token'),
trigger_rule=TriggerRule.ONE_FAILED,
text='Prediction is failed',
channel='#feed'
)
end = DummyOperator(task_id='end')
start >> [create_train_data, create_pred_data] >> submit_train_job \
>> [post_fail_slack_train, post_success_slack_train]
post_fail_slack_train >> end
post_success_slack_train >> deploy_model >> batch_prediction \
>> load_results \
>> [post_success_slack_pred, post_fail_slack_pred] >> end
This is the first task we are doing in the Training Phase. (Red frame in the figure below) Data is extracted from BigQuery using DataFlow and placed in an appropriate bucket of GCS.
--Constant DATAFLOW_TRAIN_FILE
/ DATAFLOW_PRED_FILE
--File path where the DataFlow executable file is located
--Files under the dags
directory in the Bucket
created when building the Cloud Composer environment are synchronized by the airflow worker every few seconds.
--Constant DEFAULT_ARGS
--Set the environment variables when executing DataFlow in the argument of dataflow_default_options
.
--DataFlowPythonOperator
class
--Class for executing JOB using DataFlow as a python file
--The py_file
argument has the path where the executable file is located.
--ʻOptions` Specify the argument to be passed to the executable file
--This time, the file path for placing data in GCS is specified.
DATAFLOW_TRAIN_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_train_data.py')
DATAFLOW_PRED_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'extract_pred_data.py')
DEFAULT_ARGS = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
'project_id': Variable.get('project_id'),
'dataflow_default_options': {
'project': Variable.get('project_id'),
'temp_location': 'gs://your_composer_bucket/temp',
'runner': 'DataflowRunner'
}
}
####
#Task to extract data with DataFlow
####
#File path when putting data in GCS
job_args = {
'output': 'gs://your_bucket/preprocess'
}
create_train_data = DataFlowPythonOperator(
task_id='create-train-data',
py_file=DATAFLOW_TRAIN_FILE,
options=job_args
)
create_pred_data = DataFlowPythonOperator(
task_id='create-pred-data',
py_file=DATAFLOW_PRED_FILE,
options=job_args
)
The following file is a process to divide into train data
and test data
and put them in GCS for training.
(Also, for the sake of simplicity of the article, I will omit the explanation of the query to divide it into train data
and test data
. Hashing is performed on the assumption that there is a timestamp column, and the remainder after division. (Divided by the value of)
There are two points to keep in mind here.
--Convert to CSV file
――Since you want to output it as a CSV file in the end, you need to convert the data extracted from BigQuery to a comma-separated format.
--Here, we have prepared a function called to_csv
.
--Python version of DataFlow
--Since Python of DataFlow is 3.5.x, grammars such as f-strings
added from 3.6 cannot be used.
import os
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import \
PipelineOptions
PROJECT = 'your_project_id'
def create_query(phase):
base_query = """
SELECT
*,
MOD(ABS(FARM_FINGERPRINT(CAST(timestamp AS STRING))), 10) AS hash_value
FROM
`dataset.your_table`
"""
if phase == 'TRAIN':
subsumple = """
hash_value < 7
"""
elif phase == 'TEST':
subsumple = """
hash_value >= 7
"""
query = """
SELECT
column1,
column2,
column3,
row_number()over() as key
FROM
({0})
WHERE {1}
""".\
format(base_query, subsumple)
return query
def to_csv(line):
csv_columns = 'column1,column2,column3,key'.split(',')
rowstring = ','.join([str(line[k]) for k in csv_columns])
return rowstring
def get_date():
jst_now = datetime.now()
dt = datetime.strftime(jst_now, "%Y-%m-%d")
return dt
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
required=True
)
known_args, pipeline_args = \
parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=options) as p:
for phase in ['TRAIN', 'TEST']:
query = create_query(phase)
date = get_date()
output_path = os.path.join(known_args.output, date,
'train', "{}".format(phase))
read = p | 'ExtractFromBigQuery_{}'.format(phase) >> beam.io.Read(
beam.io.BigQuerySource(
project=PROJECT,
query=query,
use_standard_sql=True
)
)
convert = read | 'ConvertToCSV_{}'.format(phase) >> beam.Map(to_csv)
convert | 'WriteToGCS_{}'.format(phase) >> beam.io.Write(
beam.io.WriteToText(output_path, file_name_suffix='.csv'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Here, we will not explain the tasks of training using ML Engine and deploying the trained model.
Also, this time, I will omit the explanation of the executable file task.py
and the model file model.py
used in ML Engine.
We are preparing a bucket for ML Engine this time. Therefore, please note that a total of two buckets are used properly when combined with the bucket created when building the Cloud Composer environment.
.
├── your_bucket
│ ├── code //Model required for learning.py and task.Put a gz file with py etc.
│ │
│ └── trainde_model //The trained model file is placed
│
└── your_composer_bucket //Bucket created when building cloud composer environment
--Constant PACKAGE_URI
--File path where the ML Engine executable file is located
--This time, the gz file containing trainer.py
and model.py
is placed under the above gs: // your_bucket / code
.
--This constant is specified in the package_uris
argument of the MLEngineTrainingOperator
class.
--BashOperator
class
--Class for executing bash command
--This time, I am using the bash command when deploying the trained model file.
--The model is deployed by executing gcloud ml-engine versions create
.
-(Probably) I think you can do the same with the MLEngineVersionOperator
class, but this time I used the bash command.
PACKAGE_URI = BUCKET + '/code/trainer-0.1.tar.gz'
OUTDIR = BUCKET + '/trained_model'
job_id = 'dev-train-{}'.\
format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
submit_train_job = MLEngineTrainingOperator(
task_id='train-model',
project_id=PROJECT_ID,
job_id=job_id,
package_uris=[PACKAGE_URI],
region=REGION,
training_python_module='trainer.task',
training_args=[f'--output_dir={OUTDIR}',
f'--job_dir={job_dir}',
'--dropout_rate=0.5',
'--batch_size=128',
'--train_step=1'
],
scale_tier='BASIC_GPU',
python_version='3.5'
)
today = get_date()
BASE_VERSION_NAME = 'v1_0'
VERSION_NAME = '{0}_{1}'.\
format(BASE_VERSION_NAME, datetime.datetime.now().strftime('%Y_%m_%d'))
MODEL_NAME = 'dev_model'
deploy_model = BashOperator(
task_id='deploy-model',
bash_command='gcloud ml-engine versions create '
'{{ params.version_name}} '
'--model {{ params.model_name }} '
'--origin $(gsutil ls gs://your_bucket/trained_model/export/exporter | tail -1) '
'--python-version="3.5" '
'--runtime-version=1.14 ',
params={'version_name': VERSION_NAME,
'model_name': MODEL_NAME}
)
Here, we will explain the task of making batch prediction using the model deployed earlier.
--Constant ʻinput_path` --The path of the CSV file extracted and placed in GCS for the prediction made earlier
--Constant ʻoutput_path` --GCS file path to put the predicted result
input_path = BUCKET + f'/preprocess/{today}/prediction/prediction-*'
output_path = BUCKET + f'/result/{today}/'
batch_prediction = MLEngineBatchPredictionOperator(
task_id='batch-prediction',
data_format='TEXT',
region=REGION,
job_id=job_id,
input_paths=input_path,
output_path=output_path,
model_name=MODEL_NAME,
version_name=VERSION_NAME
)
This section describes the task of loading the forecast results predicted earlier into BigQuery using DataFlow.
For Dag files, it's similar to the "Extract data from BigQuery" task described at the beginning.
The constant DATAFLOW_LOAD_FILE
specifies the GCS file path where the DataFlow executable file is located.
DATAFLOW_LOAD_FILE = os.path.join(
configuration.get('core', 'dags_folder'),
'dataflow', 'load.py')
load_results = DataFlowPythonOperator(
task_id='load_pred_results',
py_file=DATAFLOW_LOAD_FILE
)
In the following file, the file placed in GCS is read, converted to Json format, and loaded to an appropriate table of BigQuery. What you should be careful about here
--Batch prediction results are placed in GCS as a json format text file.
--Since the value of the output result of prediction written in the text file is described as a list format, if you try to load it as it is, the type will not match and an error will occur.
- {"key": [0], "prediction: [3.45...]"}
import logging
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
BUCKET_NAME = 'your_bucket'
INPUT = 'gs://{}/result/prediction.results-*'.format(BUCKET_NAME)
def convert(line):
import json
record = json.loads(line)
return {'key': record['key'][0], 'predictions': record['predictions'][0]}
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = \
parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=options) as p:
dataset = 'your_dataset.results'
read = p | 'ReadPredictionResult' >> beam.io.ReadFromText(INPUT)
json = read | 'ConvertJson' >> beam.Map(convert)
json | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
dataset,
schema='key:INTEGER, predictions:FLOAT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
I usually use AWS, but since I had the opportunity to touch on GCP services, I summarized ML-related services. I hope it will be helpful for those who are worried about the workflow of ML.
There is a place to deploy the model trained by ML Engine as it is, but it is not recommended. It is recommended to create a mechanism to measure / compare the accuracy of the model learned somehow, and deploy it after sandwiching the task.
Recommended Posts