Run Cloud Dataflow (Python) from App Engine

(Updated after 5/31 v2.0.0 release)

Cloud Dataflow Python version is finally GA, so I tried to see if Template execution can be done with Python version, but I found that the pipeline registered in advance ~~ (although parameters cannot be passed as of 3/23) ~~ (5/31) (It is now possible to pass parameters in the 2.0.0 release) I was able to boot from App Engine and would like to share the steps.

What is Cloud Dataflow Template execution?

It is a function that allows you to register the Dataflow pipeline in GCS in advance and execute the registered pipeline by passing any parameters at any time. By calling Template execution via App Engine, you can easily execute data processing and analysis processing from the program without setting up a server for starting the pipeline yourself. You can also use cron to run your data analysis pipeline on a regular basis.

It may not be used much at the stage of trial and error to improve accuracy by machine learning, but when it goes into actual operation, it is possible to execute a complicated data processing pipeline without worrying about server operation. I think that the operator will be much easier. Moreover, it seems that development will be easier if the pipeline used during trial and error can be put into actual operation even in a form close to that. (In addition to the difficulty of creating a high-precision model, it should be quite difficult to shape the created machine learning model as a system that operates stably.)

Execution method

Dataflow Template execution takes the following steps:

Below, I would like to explain the procedure for each step.

Fixed to use parameters in pipeline

Defines a custom option class to receive parameters passed from the outside. In Beam, the program refers to parameters passed from the outside at runtime via the ValueProvider class. The PipelineOptions class has its own parser with an add_value_provider_argument method for reading parameters as a ValueProvider. Create a custom option class that inherits the PipelineOptions class, and describe the settings of the parameters you want to add to the parser in the _add_argparse_args method that is called at initialization. In the example below, input, output and date are specified as custom parameters.

pipeline.py


import apache_beam as beam

class MyOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument(
            '--input',
            default="gs://{mybucket}/{pathtofile}",
            help='input gcs file path')
        
        parser.add_value_provider_argument(
            '--output',
            default="gs://{mybucket}/{pathtofile}",
            help='output gcs file path')

        parser.add_value_provider_argument(
            '--date',
            default="20170531",
            help='today')

options = MyOptions()

In the processing of the pipeline, modify it so that the value of ValueProvider is used. ValueProvider can be referenced as a variable of the custom Option created earlier. The program needs to get the value deferredly via ValueProvider and get the value with .get (). If you want to use ValueProvider in PTransform or DoFn internal processing, pass ValueProvider in the constructor, keep it as an instance variable, and use .get () internally to refer to it. Note that the classes ReadFromText and WriteToText provided by Beam can directly pass ValueProvider as an argument. In the following example, the destination file specified by the external parameter input is read, each line is replaced with the character string specified by date, and it is written to the path specified by output.

pipeline.py


class MyDoFn(beam.DoFn):

    #Receive ValueProvider in constructor and set to instance variable
    def __init__(self, date):
        self._date = date
    
    def process(self, element):
        yield self._date.get() #value is.get()Get by method


p = beam.Pipeline(options=options)

(p | "Read"  >> beam.io.ReadFromText(options.input)
   | "DoFn"  >> beam.ParDo(MyDoFn(options.date))  #Pass ValueProvider to DoFn constructor
   | "Write" >> beam.io.WriteToText(options.output))

p.run()

Register the pipeline with GCS

If you add the GCS path to which the Template is registered to Google Cloud Options and execute it, the pipeline will be executed instead, but the Template file that describes the processing contents of the pipeline will be registered in the specified GCS path. The execution environment should be fine wherever DataflowRunner can run the pipeline.

pipeline.py


options = MyOptions()

#Specify DataflowRunner for runner
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

# template_Specify the GCS path to register Template in location
google_cloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
google_cloud_options.template_location = 'gs://{your bucket}/mytemplate'

#Run the pipeline
p = beam.Pipeline(options=options)

~Pipeline processing code~

p.run().wait_until_finish()

When executed above, a Template file that describes the processing contents of the pipeline is generated in the specified GCS path.

Run the registered pipeline

Execution of the registered Template sends instructions to Google REST API. It seems that Google Cloud Client Library is not yet supported (as of March 2017), so we will use Google APIs Client Library here. Before using it, install Dataflow's API Client Library (v1b3 seems to be the latest version as of 3/23). If you specify the GCS path created by the above execution in gcsPath of the body of the request parameter and execute it, the job will be generated and executed from Template. The sample code of Go and Python is shown below, but it should be possible to execute it from Library of other languages. (I tried the Python version locally, but I haven't tried it on App Engine, so please let me know if you have any problems.)

Go


import (
  "net/http"
  "golang.org/x/net/context"
  "golang.org/x/oauth2"
  "golang.org/x/oauth2/google"
  "google.golang.org/appengine"
  dataflow "google.golang.org/api/dataflow/v1b3"
  "google.golang.org/appengine/urlfetch"
)
//Omission
func handler(w http.ResponseWriter, r *http.Request) {
  c := appengine.NewContext(r)
  client := &http.Client{
    Transport: &oauth2.Transport{
      Source: google.AppEngineTokenSource(c, "https://www.googleapis.com/auth/cloud-platform"),
      Base:   &urlfetch.Transport{Context: c},
    },
  }

  service, err := dataflow.New(client)
  templates := dataflow.NewProjectsTemplatesService(service)
  req := &dataflow.CreateJobFromTemplateRequest{
    GcsPath: "gs://{your bucket}/mytemplate",
    JobName: "sklearn",
    Parameters: map[string]string{
      "input": "gs://{yourbucket}/{pathtofile1}",
      "output": "gs://{yourbucket}/{pathtofile2}",
      "date": "20170601",
    },
  }
  job, err := templates.Create("{your project}", req).Do()
  //Omission
}

Python


from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from apiclient.discovery import build

credentials = GoogleCredentials.get_application_default()
service = build("dataflow", "v1b3", credentials=credentials)
templates = service.projects().templates()

body = {
    "environment": {
        "bypassTempDirValidation": False,
        "tempLocation": "gs://{your bucket}/temp",
        #"serviceAccountEmail": "A String",
        #"zone": "us-central1-f",
        "maxWorkers": 1,
    },
    "gcsPath": "gs://{your bucket}/mytemplate",
    "parameters": {
      "input": "gs://{yourbucket}/{pathtofile1}",
      "output": "gs://{yourbucket}/{pathtofile2}",
      "date": "20170601",
    },
    "jobName": "sklearn",
}
req = templates.create(projectId="{your project}", body=body)
res = req.execute()

It seems that the only required items in the body are gcsPath and jobName. It seems that jobName should contain a character string that is unique to the job being executed. parameter specifies the runtime parameters you want to pass to the pipeline at runtime. The response contains the job ID, so keep it if you want to cancel the job later.

By the way, the registered pipeline can also be executed from the console. You can also execute a job by selecting a custom template on the screen below that transitions from [+ RUN JOB] at the top of the Dataflow console screen and specifying the GCS path of the registered Template. dataflowjob.png

Stop the running pipeline

If you start a job in the pipeline but find a problem, or if you want to start it only for a specified time in streaming mode, you need to stop the job as well as start the job from Template. When stopping, specify state as "JOB_STATE_CANCELLED" in the same Dataflow REST API and update the job. Below is a Python code example.

Python


jobs= service.projects().jobs()

body = {
    "requestedState": "JOB_STATE_CANCELLED"
}
req = jobs.update(projectId={your project}, jobId={job ID}, body=body)
res = req.execute()

This will cancel the job and delete the started cluster.

in conclusion

You can also periodically execute the data analysis pipeline created in advance by using cron etc. from App Engine. This has expanded the range of dataflow utilization not only for preprocessing in the verification phase of data analysis, but also for data collection and processing in the operation phase. Since you can easily create a pipeline, by writing a pre-processing workflow that assumes data collection during operation even in the verification phase of the data analysis system, the data that was supposed to be used at the time of verification is acquired at the time of system development. You can find out that the cost is unexpectedly high and make it easier to quickly identify problems such as crying and redoing modeling.

I think that one of the features of GCP is that application developers and machine learning engineers can concentrate on development and data analysis by having the cloud side take charge of troublesome infrastructure construction and operation. I will. I expect Dataflow to take on the role of building and operating a data processing pipeline, which tends to be troublesome in data analysis, just as App Engine was used in web application development.

for Java

Run Cloud Dataflow for Java from App Engine for Go with runtime parameters

Recommended Posts

Run Cloud Dataflow (Python) from App Engine
Run XGBoost with Cloud Dataflow (Python)
[Python] Run Flask on Google App Engine
Run python from excel
Cloud Run tutorial (python)
Run a machine learning pipeline with Cloud Dataflow (Python)
Run illustrator script from python
How to enable python3 to run when sending jobs from GCP Cloud Composer to Dataflow
Java 1 1 support from Google App Engine
Using Cloud Storage from Python3 (Introduction)
Run Aprili from Python with Orange
Python error detection run from Powershell
Run Python scripts synchronously from C #
Run Ansible from Python using API
Use Cloud Datastore from Compute Engine
Run Python Scripts from Cisco Memorandum_EEM
Use Google Cloud Vision API from Python
Use Django's ImageField on App Engine / Python
Operate Sakura's cloud object storage from Python
Run a Python script from a C # GUI application
Firebase: Use Cloud Firestore and Cloud Storage from Python
Access Cloud Storage from your Compute Engine instance
sql from python
Tweet (API 1.1) on Google App Engine for Python
MeCab from Python
Operate the schedule app using python from iphone
Run a Python file from html using Django
Run a python script from excel (using xlwings)
Run the flask app on Cloud9 and Apache Httpd
Run Python from Excel VBA with xlwings & tutorial supplement
App development to tweet in Python from Visual Studio 2017
Deploy a Django application on Google App Engine (Python3)
From python to running instance on google cloud platform
Google App Engine / Python development environment construction procedure (late 2014)
Steps from installing Python 3 to creating a Django app
PIL with Python on Windows 8 (for Google App Engine)
Getting Started with Google App Engine for Python & PHP
SPA learning sample (Angular2 + Bootstrap / App Engine / Python + webapp2)
How to use Django on Google App Engine / Python
Runtime version of Google App Engine / Python Standard Environment
Run iphone safari from mac with python + selenium + safari-webdriver
Generate Word Cloud from case law data in python3
Use thingsspeak from python
Run Python with VBA
Operate Filemaker from Python
Use fluentd from python
Run prepDE.py with python3
Access bitcoind from python
Changes from Python 3.0 to Python 3.5
Changes from Python 2 to Python 3.0
Python from or import
Use MySQL from Python
Install python from source
Execute command from Python
Run Blender with python
Operate neutron from Python!
Cloud Dataflow Super Primer
Use MySQL from Python
Operate LXC from Python
Manipulate riak from python
Jinja2 | Python template engine