On GCP, Cloud Dataflow is often used in pipelines for ETL applications, but as a reminder, I tried to use it not only for preprocessing but also for pipeline execution including machine learning.
I want to be able to easily execute a large number of learning / prediction scripts that were moved on a small scale at hand when a large amount of learning trial and error is performed in a distributed environment. It was troublesome to modify the learning / prediction script that I was running at hand so that it would work remotely every time, and to set up a machine and disperse it. I also want to be able to move from pre-processing such as attribute generation to learning and evaluation in one stroke. If the code of the preprocessing and learning part is divided, the prediction model cannot be reproduced unless the code and intermediate data versions are carefully managed, and if it is implemented as a pipeline, it can be incorporated into the system. Because it seems to be easy.
Install the following on a working machine that runs the pipeline at hand or submits a job to the cloud. (Note that Python only supports 2 systems at the moment when running in the cloud)
python
git clone https://github.com/apache/beam.git
cd beam/sdks/python/
python setup.py sdist
cd dist/
pip install apache-beam-sdk-*.tar.gz
python
pip install --upgrade google-cloud-dataflow
I have 0.6.0 and 0.5.5 installed in my environment, respectively. After that, install libraries such as scikit-learn and pandas that are necessary to run in your environment.
Here, let's consider the following assumption learning / prediction pipeline using pandas and scikit-learn that are already installed in the Dataflow execution environment.
Here, the data is created in advance for both learning and evaluation and put in BigQuery, and it is assumed that the hyperparameters have been decided and you want to evaluate a large number of prediction models at once. It is assumed that the learning model will be re-learned every year in order to deal with the deterioration of the learning model over time.
The explanation will proceed using the data acquired by the following query as an example.
python
SELECT year,date,shop_id,sales,attr1,attr2,attr3
FROM dataset.table
It is assumed that shop_id is the unique key of the store, sales is the objective variable, and attr1-3 is the attribute.
Below, we will enter the setting items of Pipeline.
option setting
import apache_beam as beam
import apache_beam.transforms.window as window
options = beam.utils.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{YOUR_PROJECT}'
google_cloud_options.job_name = 'sklearn'
google_cloud_options.staging_location = 'gs://{YOUR_BUCKET}/binaries'
google_cloud_options.temp_location = 'gs://{YOUR_BUCKET}/temp'
worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 10
#options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'
pipeline = beam.Pipeline(options=options)
In Google Cloud Options, we will describe the settings to run on GCP. Specify the executable file or temporary file storage location with staging_location or temp_location.
Worker Options sets the worker. By default, GCP will automatically determine the configuration according to your load. (The Japanese version document states that Python is not supported, but the English version states that it is supported) Even when auto scale is on, you can limit the scale by specifying the maximum number of workers with max_num_worker.
Standard Options specifies the environment in which the pipeline runs. If you specify DirectRunner, it will run in your environment, and if you set it to DataflowRunner, it will run on GCP. It seems good to check the operation of a small workload at hand and run it on the cloud if there is no problem.
There are many other option settings, check the command line help and Source Comments. I can do it.
The pipeline is described by connecting each process in order with a pipe operator.
Pipeline
(pipeline
| "Query data" >> beam.Read(beam.io.BigQuerySource(query=query))
| "Assign time" >> beam.Map(assign_timevalue)
| "Set window" >> beam.WindowInto(window.SlidingWindows(size=3, period=1))
| "Assign group key" >> beam.Map(lambda v: (v["shop_id"], v))
| "Group by group key and time window" >> beam.GroupByKey()
| "Learn and predict" >> beam.FlatMap(learn_predict)
| "Write predict data" >> beam.Write(beam.io.BigQuerySink('dataset.table',
schema="shop_id:STRING, date:STRING, predict:FLOAT, sales:INTEGER",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)))
pipeline.run()
In the first step, we are reading data by specifying a query from BigQuery. The second step specifies the reference value that determines the window range of the next step. This time, the data is divided by year, so specify the column that indicates the year of each data (details will be described later). The third step specifies the width and spacing of the windows. This time, the width is 3 years (2 years learning, 1 year forecast), and it is shifted by 1 year, so set size = 3, period = 1. Sliding Windows is a window for sliding, but there are many others such as Fixed Windows for fixing and Sessions for sessions. The store ID is specified as the Key that you want to specify the group in the 4th step. It will be grouped by the window and key (store) specified earlier in the 5th step. The sixth step performs learning / prediction processing for each grouped data and returns the prediction result. The reason for using FlatMap is that the data aggregated for each store x window is re-distributed and returned on a daily basis. In the 7th step, the daily forecast results are saved in BigQuery. The pipeline is executed when the pipeline in the last stage is run.
Next, let's take a look inside each function.
A function that returns a value for splitting a window
def assign_timevalue(v):
import apache_beam.transforms.window as window
return window.TimestampedValue(v, v["year"])
To specify the value to use in the window, replace the value with TimestampedValue. The first TimestampedValue is the value, and the second is the value used in the window. The caveat here is that you need to specify import to reference a package or module within a function. There is no problem if you move it at hand, but on the cloud, this function is distributed and executed by worker nodes. You need to import the package so that it works even in the environment of the worker node. Please note that globally defined variables cannot be accessed on the cloud.
Function that makes learning prediction
def learn_predict(records):
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
target_attr = 'sales'
learn_attrs = ['attr1', 'attr2', 'attr3']
data = pd.DataFrame(records[1])
data[learn_attrs] = data[learn_attrs].apply(pd.to_numeric)
data = data.fillna(0.)
if len(data["year"].unique()) < 3:
return [] #Do nothing for combinations less than 3 years
year_max = data["year"].max()
train = data[data["year"] < year_max] #2 years ago for learning
test = data[data["year"] == year_max] #The next year is for forecast evaluation
model = GradientBoostingRegressor()
model.fit(train[learn_attrs], train[target_attr])
test.loc[:, "predict"] = model.predict(test[learn_attrs])
return test[["shop_id","date","predict","sales"]].to_dict(orient='records')
Since the data is passed to the learning / prediction function as a tuple whose values are the key (store ID) and the dictionary format list, the value list is converted into a Dataframe for learning / prediction. The last line is a conversion to pass the result as a dictionary format list to the BigQuery insert in the latter part.
When you execute the pipeline in this way, the prediction result and correct answer data will be entered in BigQuery, so you can calculate and evaluate indicators such as RMSE from various perspectives such as store and year by SQL.
Running the learning process on Dataflow may be a bad idea from the purpose of the service, but I was able to move it. This time it was a simple one-way pipeline that learns and predicts from the data created in BigQuery and saves the result, but you can add data processing etc., pass the evaluation data from another flow, and the result It seems that it is possible to flexibly change by branching the prediction result and the prediction model and outputting it to the subsequent stage. It was assumed that hyperparameters have been decided this time, but I would like to try mass parallel execution of parameter tuning.
Cloud Dataflow is a service that hasn't received much attention in GCP yet, but personally, Dataflow is fully managed to build and operate dataflow, which tends to be troublesome for applications that handle complex data processing such as machine learning. Expects to be like App Engine for data analysis applications.
This time I used scikit-learn which is installed by default in Dataflow, but in reality you will want to use various libraries. Next time, I would like to describe the procedure for installing any library using the installation of xgboost as an example.
Official documentation
Mr. Nakai's easy-to-understand Google data processing technology trilogy
Distributed data processing platform FlumeJava with MapReduce as backend
Streaming processing design pattern realized by "Cloud Dataflow"
Related code
Sample of distributed processing of TensorFlow Prediction with Cloud Dataflow
Special image classification by Cloud Machine Learning and Cloud Dataflow
Recommended Posts