This article is the 6th day article of Puri Puri Appliance Advent Calendar 2019.
This time, I will introduce how to process a large amount of data by distributed processing using Dataflow (ApacheBeam) + Python3, which I usually use as an ML engineer.
What to introduce this time --How to get Dataflow to work using ApacheBeam's Python SDK --How to process and insert large amounts of data from BigQuery to BigQuery
Not to introduce this time --Detailed concept of ApacheBeam --Processing using input / output other than BigQuery
Dataflow is one of the services provided by Google Could Platform. Dataflow official website Since it is easy to implement distributed processing and it is easy to link with BigQuery, it is often used as an analysis board. Dataflow is implemented internally using Apache Beam, a framework for implementing pipeline processing. Therefore, you will probably use the ApacheBeam SDK for actual development. Apache Beam official website Dataflow supports two types of ApachBeam SDKs, Java and Python.
Let's take a look at the flow from actually building the environment to running the process on Dataflow.
Let's build a virtual environment for development using Pipenv. (If the versions of Python and other libraries are the same, you do not need to use Pipenv.) Launch the environment using a Pipfile like the one below.
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
apache-beam=="1.14.*"
[requires]
python_version = "3.7"
Points to watch out for
--ApacheBeam SDK Version
The SDK version supported by Dataflow is up to 1.14
.
--Python version
At the moment (December 06, 2019), the version of PythonSDK supported by Dataflow is 2 series, and a warning will be issued when using Python 3 series.
However, from my rule of thumb, it is extremely rare for a problem to occur, and I think it is safe to operate it in production. (However, we cannot take responsibility, so please use it at your own discretion in the end.)
You will need setup.py
to save and run it as templeate on Dataflow.
Here, we will describe the dependencies at runtime.
ʻEntry_points` Please specify according to your own package configuration.
setup.py
PACKAGES = [
"apache-beam[gcp]==2.14.*",
]
setup(
name='dataflow-sample',
url='',
author='',
author_email='',
version='0.1',
install_requires=REQUIRED_PACKAGES,
packages=find_packages(),
include_package_data=True,
entry_points=dict(console_scripts=[
'sample=sample:main'
]),
description='dataflow sample',
)
Set up to run the Apache Beam pipeline and implement the main function. The required steps are as follows.
--Set runner --Create an instance of Pipeline
setup_sample.py
import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def setup(args):
runner = "DirectRunner"
return beam.Pipeline(runner, options=PipelineOptions(args))
def main():
pipeline = setup(sys.args)
with pipeline as p:
#Describe the pipeline
pass
In order to actually work with Dataflow, it is necessary to set various options specified in the SDK. The following is a list of typical ones.
StandardOptions
name | type | description |
---|---|---|
streaming | boolean | Select streaming mode or batch mode |
SetupOptions
name | type | description |
---|---|---|
setup_file | string | setup.Specify the path of py |
GoogleCouldOptions
name | type | description |
---|---|---|
region | string | Specify the region to use |
project | string | Specify the project ID to use |
job_name | string | Specify the name when the job was executed(Arbitrary value) |
template_location | string | Specify the GCP path to save the template |
These options must be specified in code or at run-time command line arguments. When specified by code, it is as follows.
options_sample.py
def option_setting(options: PipelineOptions) -> PipelineOptions:
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.region = "asia-northeast1"
cloud_options.project = "your project id"
setup_options = options.view_as(SetupOptions)
setup_options.setup_file = "specify your setup.py path"
return options
def setup(args):
runner = "DirectRunner"
options = option_setting(PipelineOptions(args))
return beam.Pipeline(runner, options=options)
Basically, it behaves as ʻOptions that you want to set with
PipelineOptions.view_as () `.
All you have to do now is set the value for the property you want to specify.
You can also create your own custom options if you have the settings you need at run time.
The implementation simply inherits PipelineOptions
and overrides the required methods.
costom_options_sample.py
class CostomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--hoge',
type=int,
default=0,
help='This is Costom Value'
)
Let's define a pipeline that actually reads data from BigQuery and stores it in BigQuery. As a simple example, let's implement the process of extracting only the id from the User table and inserting it into another table.
pipeline.py
def b2b_pipline(pipe: PCollection):
#Describe the executed SQL
query = "SELECT id, name, age FROM sample.users"
_ = (pipe
| "Read from BigQuery" >> beam.io.Read(BigQuerySource(query=query, use_standard_sql=True))
| "Preprocess" >> beam.Map(lambda data: data["id"])
| "Write to BigQuery" >> apache_beam.io.WriteToBigQuery(
table="user_ids",
schema="id:INTEGER",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
)
)
In the processing of Pipline, three operations of input, intermediate operation, and output are performed. There are many types other than the ones introduced this time, so you can customize them by referring to the Official Reference.
Let's move the implemented pipeline on Local and GCP.
There are several environments to choose from when running Apache Beam.
--Run on Local --Template in Dataflow, run without saving --Save and run Template in Dataflow
Use Case | Runner | template_location |
---|---|---|
Run on Local | DirectRunner | None |
Run with Dataflow | DataflowRunner | None |
Run as template in Dataflow | DataflowRunner | Specify the GCS path to save the template |
Saving the Template allows you to save the Pipeline to GCS and launch it from the console or command line. This is very useful when you want to run Pipeline on time.
This time, I introduced how to implement ApcheBeam in Python and run it in Dataflow. I hope it will be helpful to you.
Recommended Posts