(Updated after 5/31 v2.0.0 release)
Cloud Dataflow Python version is finally GA, so I tried to see if Template execution can be done with Python version, but I found that the pipeline registered in advance ~~ (although parameters cannot be passed as of 3/23) ~~ (5/31) (It is now possible to pass parameters in the 2.0.0 release) I was able to boot from App Engine and would like to share the steps.
It is a function that allows you to register the Dataflow pipeline in GCS in advance and execute the registered pipeline by passing any parameters at any time. By calling Template execution via App Engine, you can easily execute data processing and analysis processing from the program without setting up a server for starting the pipeline yourself. You can also use cron to run your data analysis pipeline on a regular basis.
It may not be used much at the stage of trial and error to improve accuracy by machine learning, but when it goes into actual operation, it is possible to execute a complicated data processing pipeline without worrying about server operation. I think that the operator will be much easier. Moreover, it seems that development will be easier if the pipeline used during trial and error can be put into actual operation even in a form close to that. (In addition to the difficulty of creating a high-precision model, it should be quite difficult to shape the created machine learning model as a system that operates stably.)
Dataflow Template execution takes the following steps:
Below, I would like to explain the procedure for each step.
Defines a custom option class to receive parameters passed from the outside. In Beam, the program refers to parameters passed from the outside at runtime via the ValueProvider class. The PipelineOptions class has its own parser with an add_value_provider_argument method for reading parameters as a ValueProvider. Create a custom option class that inherits the PipelineOptions class, and describe the settings of the parameters you want to add to the parser in the _add_argparse_args method that is called at initialization. In the example below, input, output and date are specified as custom parameters.
pipeline.py
import apache_beam as beam
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default="gs://{mybucket}/{pathtofile}",
help='input gcs file path')
parser.add_value_provider_argument(
'--output',
default="gs://{mybucket}/{pathtofile}",
help='output gcs file path')
parser.add_value_provider_argument(
'--date',
default="20170531",
help='today')
options = MyOptions()
In the processing of the pipeline, modify it so that the value of ValueProvider is used. ValueProvider can be referenced as a variable of the custom Option created earlier. The program needs to get the value deferredly via ValueProvider and get the value with .get (). If you want to use ValueProvider in PTransform or DoFn internal processing, pass ValueProvider in the constructor, keep it as an instance variable, and use .get () internally to refer to it. Note that the classes ReadFromText and WriteToText provided by Beam can directly pass ValueProvider as an argument. In the following example, the destination file specified by the external parameter input is read, each line is replaced with the character string specified by date, and it is written to the path specified by output.
pipeline.py
class MyDoFn(beam.DoFn):
#Receive ValueProvider in constructor and set to instance variable
def __init__(self, date):
self._date = date
def process(self, element):
yield self._date.get() #value is.get()Get by method
p = beam.Pipeline(options=options)
(p | "Read" >> beam.io.ReadFromText(options.input)
| "DoFn" >> beam.ParDo(MyDoFn(options.date)) #Pass ValueProvider to DoFn constructor
| "Write" >> beam.io.WriteToText(options.output))
p.run()
If you add the GCS path to which the Template is registered to Google Cloud Options and execute it, the pipeline will be executed instead, but the Template file that describes the processing contents of the pipeline will be registered in the specified GCS path. The execution environment should be fine wherever DataflowRunner can run the pipeline.
pipeline.py
options = MyOptions()
#Specify DataflowRunner for runner
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# template_Specify the GCS path to register Template in location
google_cloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
google_cloud_options.template_location = 'gs://{your bucket}/mytemplate'
#Run the pipeline
p = beam.Pipeline(options=options)
~Pipeline processing code~
p.run().wait_until_finish()
When executed above, a Template file that describes the processing contents of the pipeline is generated in the specified GCS path.
Execution of the registered Template sends instructions to Google REST API. It seems that Google Cloud Client Library is not yet supported (as of March 2017), so we will use Google APIs Client Library here. Before using it, install Dataflow's API Client Library (v1b3 seems to be the latest version as of 3/23). If you specify the GCS path created by the above execution in gcsPath of the body of the request parameter and execute it, the job will be generated and executed from Template. The sample code of Go and Python is shown below, but it should be possible to execute it from Library of other languages. (I tried the Python version locally, but I haven't tried it on App Engine, so please let me know if you have any problems.)
Go
import (
"net/http"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/appengine"
dataflow "google.golang.org/api/dataflow/v1b3"
"google.golang.org/appengine/urlfetch"
)
//Omission
func handler(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
client := &http.Client{
Transport: &oauth2.Transport{
Source: google.AppEngineTokenSource(c, "https://www.googleapis.com/auth/cloud-platform"),
Base: &urlfetch.Transport{Context: c},
},
}
service, err := dataflow.New(client)
templates := dataflow.NewProjectsTemplatesService(service)
req := &dataflow.CreateJobFromTemplateRequest{
GcsPath: "gs://{your bucket}/mytemplate",
JobName: "sklearn",
Parameters: map[string]string{
"input": "gs://{yourbucket}/{pathtofile1}",
"output": "gs://{yourbucket}/{pathtofile2}",
"date": "20170601",
},
}
job, err := templates.Create("{your project}", req).Do()
//Omission
}
Python
from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from apiclient.discovery import build
credentials = GoogleCredentials.get_application_default()
service = build("dataflow", "v1b3", credentials=credentials)
templates = service.projects().templates()
body = {
"environment": {
"bypassTempDirValidation": False,
"tempLocation": "gs://{your bucket}/temp",
#"serviceAccountEmail": "A String",
#"zone": "us-central1-f",
"maxWorkers": 1,
},
"gcsPath": "gs://{your bucket}/mytemplate",
"parameters": {
"input": "gs://{yourbucket}/{pathtofile1}",
"output": "gs://{yourbucket}/{pathtofile2}",
"date": "20170601",
},
"jobName": "sklearn",
}
req = templates.create(projectId="{your project}", body=body)
res = req.execute()
It seems that the only required items in the body are gcsPath and jobName. It seems that jobName should contain a character string that is unique to the job being executed. parameter specifies the runtime parameters you want to pass to the pipeline at runtime. The response contains the job ID, so keep it if you want to cancel the job later.
By the way, the registered pipeline can also be executed from the console. You can also execute a job by selecting a custom template on the screen below that transitions from [+ RUN JOB] at the top of the Dataflow console screen and specifying the GCS path of the registered Template.
If you start a job in the pipeline but find a problem, or if you want to start it only for a specified time in streaming mode, you need to stop the job as well as start the job from Template. When stopping, specify state as "JOB_STATE_CANCELLED" in the same Dataflow REST API and update the job. Below is a Python code example.
Python
jobs= service.projects().jobs()
body = {
"requestedState": "JOB_STATE_CANCELLED"
}
req = jobs.update(projectId={your project}, jobId={job ID}, body=body)
res = req.execute()
This will cancel the job and delete the started cluster.
You can also periodically execute the data analysis pipeline created in advance by using cron etc. from App Engine. This has expanded the range of dataflow utilization not only for preprocessing in the verification phase of data analysis, but also for data collection and processing in the operation phase. Since you can easily create a pipeline, by writing a pre-processing workflow that assumes data collection during operation even in the verification phase of the data analysis system, the data that was supposed to be used at the time of verification is acquired at the time of system development. You can find out that the cost is unexpectedly high and make it easier to quickly identify problems such as crying and redoing modeling.
I think that one of the features of GCP is that application developers and machine learning engineers can concentrate on development and data analysis by having the cloud side take charge of troublesome infrastructure construction and operation. I will. I expect Dataflow to take on the role of building and operating a data processing pipeline, which tends to be troublesome in data analysis, just as App Engine was used in web application development.
for Java
Run Cloud Dataflow for Java from App Engine for Go with runtime parameters
Recommended Posts