Process big data with Dataflow (ApacheBeam) + Python3

Introduction

This article is the 6th day article of Puri Puri Appliance Advent Calendar 2019.

This time, I will introduce how to process a large amount of data by distributed processing using Dataflow (ApacheBeam) + Python3, which I usually use as an ML engineer.

What to introduce this time --How to get Dataflow to work using ApacheBeam's Python SDK --How to process and insert large amounts of data from BigQuery to BigQuery

Not to introduce this time --Detailed concept of ApacheBeam --Processing using input / output other than BigQuery

What is Dataflow? What is Apache Beam?

Dataflow is one of the services provided by Google Could Platform. Dataflow official website Since it is easy to implement distributed processing and it is easy to link with BigQuery, it is often used as an analysis board. Dataflow is implemented internally using Apache Beam, a framework for implementing pipeline processing. Therefore, you will probably use the ApacheBeam SDK for actual development. Apache Beam official website Dataflow supports two types of ApachBeam SDKs, Java and Python.

I will actually touch it

Let's take a look at the flow from actually building the environment to running the process on Dataflow.

Environment

Let's build a virtual environment for development using Pipenv. (If the versions of Python and other libraries are the same, you do not need to use Pipenv.) Launch the environment using a Pipfile like the one below.

[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
apache-beam=="1.14.*"

[requires]
python_version = "3.7"

Points to watch out for

--ApacheBeam SDK Version The SDK version supported by Dataflow is up to 1.14. --Python version At the moment (December 06, 2019), the version of PythonSDK supported by Dataflow is 2 series, and a warning will be issued when using Python 3 series. However, from my rule of thumb, it is extremely rare for a problem to occur, and I think it is safe to operate it in production. (However, we cannot take responsibility, so please use it at your own discretion in the end.)

Create Setup.py

You will need setup.py to save and run it as templeate on Dataflow. Here, we will describe the dependencies at runtime. ʻEntry_points` Please specify according to your own package configuration.

setup.py


PACKAGES = [
    "apache-beam[gcp]==2.14.*",
]

setup(
    name='dataflow-sample',
    url='',
    author='',
    author_email='',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    entry_points=dict(console_scripts=[
        'sample=sample:main'
    ]),
    description='dataflow sample',
)

Set up the pipeline

Set up to run the Apache Beam pipeline and implement the main function. The required steps are as follows.

--Set runner --Create an instance of Pipeline

setup_sample.py


import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


def setup(args):
    runner = "DirectRunner" 
    return beam.Pipeline(runner, options=PipelineOptions(args))


def main():
    pipeline = setup(sys.args)
    with pipeline as p:
        #Describe the pipeline
        pass

Define options

In order to actually work with Dataflow, it is necessary to set various options specified in the SDK. The following is a list of typical ones.

StandardOptions

name type description
streaming boolean Select streaming mode or batch mode

SetupOptions

name type description
setup_file string setup.Specify the path of py

GoogleCouldOptions

name type description
region string Specify the region to use
project string Specify the project ID to use
job_name string Specify the name when the job was executed(Arbitrary value)
template_location string Specify the GCP path to save the template

These options must be specified in code or at run-time command line arguments. When specified by code, it is as follows.

options_sample.py


def option_setting(options: PipelineOptions) -> PipelineOptions:
    cloud_options = options.view_as(GoogleCloudOptions)
    cloud_options.region = "asia-northeast1"
    cloud_options.project = "your project id"
    
    setup_options = options.view_as(SetupOptions)
    setup_options.setup_file = "specify your setup.py path"
    return options

def setup(args):
    runner = "DirectRunner" 
    options = option_setting(PipelineOptions(args))
    return beam.Pipeline(runner, options=options)

Basically, it behaves as ʻOptions that you want to set with PipelineOptions.view_as () `. All you have to do now is set the value for the property you want to specify.

You can also create your own custom options if you have the settings you need at run time. The implementation simply inherits PipelineOptions and overrides the required methods.

costom_options_sample.py


class CostomOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--hoge',
            type=int,
            default=0,
            help='This is Costom Value'
        )

Define a BigQuery to BigQuery pipeline

Let's define a pipeline that actually reads data from BigQuery and stores it in BigQuery. As a simple example, let's implement the process of extracting only the id from the User table and inserting it into another table.

pipeline.py


def b2b_pipline(pipe: PCollection):

    #Describe the executed SQL
    query = "SELECT id, name, age FROM sample.users"
    
    _ = (pipe
        | "Read from BigQuery" >> beam.io.Read(BigQuerySource(query=query, use_standard_sql=True))
        | "Preprocess" >> beam.Map(lambda data: data["id"])
        | "Write to BigQuery" >> apache_beam.io.WriteToBigQuery(
                table="user_ids",
                schema="id:INTEGER",
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=BigQueryDisposition.WRITE_APPEND,
            ) 
        )

In the processing of Pipline, three operations of input, intermediate operation, and output are performed. There are many types other than the ones introduced this time, so you can customize them by referring to the Official Reference.

About the environment that actually operates

Let's move the implemented pipeline on Local and GCP.

There are several environments to choose from when running Apache Beam.

--Run on Local --Template in Dataflow, run without saving --Save and run Template in Dataflow

Use Case Runner template_location
Run on Local DirectRunner None
Run with Dataflow DataflowRunner None
Run as template in Dataflow DataflowRunner Specify the GCS path to save the template

Saving the Template allows you to save the Pipeline to GCS and launch it from the console or command line. This is very useful when you want to run Pipeline on time.

At the end

This time, I introduced how to implement ApcheBeam in Python and run it in Dataflow. I hope it will be helpful to you.

Recommended Posts

Process big data with Dataflow (ApacheBeam) + Python3
Process Pubmed .xml data with python
Process Pubmed .xml data with python [Part 2]
Data analysis with python 2
Data analysis with Python
Process csv data with python (count processing using pandas)
Process feedly xml with Python.
Read json data with python
[Python] Get economic data with DataReader
Run XGBoost with Cloud Dataflow (Python)
Python data structures learned with chemoinformatics
Easy data visualization with Python seaborn.
Data analysis starting with python (data visualization 1)
Data analysis starting with python (data visualization 2)
Python application: Data cleansing # 2: Data cleansing with DataFrame
Get additional data in LDAP with python
Data pipeline construction with Python and Luigi
Receive textual data from mysql with python
[Note] Get data from PostgreSQL with Python
Add a Python data source with Redash
Retrieving food data with Amazon API (Python)
Try working with binary data in Python
Generate Japanese test data with Python faker
Convert Excel data to JSON with python
[Python] Use string data with scikit-learn SVM
Download Japanese stock price data with python
Manipulate DynamoDB data with Lambda (Node & Python)
Convert FX 1-minute data to 5-minute data with Python
Process multiple lists with for in Python
Data analysis starting with python (data preprocessing-machine learning)
Organize data divided by folder with Python
FizzBuzz with Python3
Scraping with Python
Create test data like that with Python (Part 1)
Statistics with python
Read data with python / netCDF> nc.variables [] / Check data size
Scraping with Python
Read Python csv data with Pandas ⇒ Graph with Matplotlib
Python with Go
Data analysis python
Read table data in PDF file with Python
Integrate with Python
Get stock price data with Quandl API [Python]
I tried to get CloudWatch data with Python
AES256 with python
Tested with Python
A story stuck with handling Python binary data
python starts with ()
Folium: Visualize data on a map with Python
with syntax (Python)
Bingo with python
Get started with Python on macOS Big Sur
Write CSV data to AWS-S3 with AWS-Lambda + Python
Zundokokiyoshi with python
I started machine learning with Python Data preprocessing
Excel with Python
[python] Read data
Microcomputer with Python
Extract data from a web page with Python
Cast with python
[Python] Create structured array (store heterogeneous data with NumPy)