Apache Beam (Dataflow) Practical Introduction [Python]

Introduction

This article is based on the contents of the Apache Beam Documentation (https://beam.apache.org/documentation/).

It implements a program that can be batch processed with Apache Beam Python SDK, and summarizes the procedure and method to execute it with Cloud Dataflow. It also touches on the basic concepts of Apache Beam, testing and design.

beam-logo-full-color-name-right-500.png

Getting Started with Apache Beam SDK

The Apache Beam SDK can be selected from ** Java **, ** Python **, ** Go **, and provides the following ** functions that simplify the distributed processing mechanism **. doing.

-** Pipeline: ** Encapsulates the entire processing task (pipeline). Processing tasks include reading input data, converting, and writing output data. -** PCollection: ** An object that represents the dataset to be distributed. Normally, you read data from an external data source and create a PCollection, but you can also create it from in-memory. -** Transform: ** Provides data transformation processing functionality. Every Transform takes one or more PCollections as input, performs some processing on the elements of that PCollection, and outputs zero or more PCollections. -** I / O Transform: ** Provides a function (Read / Write Transform) that allows you to read and write data to various external storage systems (GCS, BigQuery, etc.).

Apache Beam runtime environment

Programs created by the Apache Beam SDK can be run on distributed data processing systems such as: In Apache Beam, this execution environment is called ** Runner **.

-** DirectRunner: ** On the local machine (used for testing etc.)

This time, we will run it in two execution environments, DirectRunner and DataflowRunner.

Pipeline implementation

A general (simple) Apache Beam program is created and operates as follows.

  1. Create a ** Pipeline object ** and set run options.
  2. Use ** Read Transform ** to read data from an external storage system or in-memory and ** create a PCollection **.
  3. Apply ** Transform ** to PCollection. Transform can transform the elements in PCollection with various logics.
  4. Apply ** Write Transform ** to write the PCollection transformed by the Transform to the external source.

For this process flow, the pipeline would be as follows:

image.png

Let's actually implement a simple pipeline like the one above in Python. The operating environment is assumed to be as follows.

--Python version: 2.7 or higher for 2 series or 3.5 or higher for 3 series --Apache Beam version: 2.15. *

Install Apache Beam SDK

If you don't need any additional packages, install them with the following command:

pip install apache-beam

This time, we are assuming that it will be executed on Dataflow (GCP), so we will also install additional packages of GCP.

pip install apache-beam[gcp]

Completion code

This is the completed code. I will explain each of them below.

pipeline.py


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions


class MyOptions(PipelineOptions):
    """Custom options."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',
            default='./input.txt',
            help='Input path for the pipeline')

        parser.add_argument(
            '--output',
            default='./output.txt',
            help='Output path for the pipeline')


class ComputeWordLength(beam.DoFn):
    """Conversion process to find the number of characters."""

    def __init__(self):
        pass

    def process(self, element):
        yield len(element)


def run():
    options = MyOptions()
    # options.view_as(StandardOptions).runner = 'DirectRunner'
    p = beam.Pipeline(options=options)

    (p
     | 'ReadFromText' >> beam.io.ReadFromText(options.input)  # I/Apply O Transform and load the data into the optional path
     | 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())  #Apply Transform
     | 'WriteToText' >> beam.io.WriteToText(options.output))  # I/Apply O Transform and write data to the optional path

    p.run()


if __name__ == '__main__':
    run()

Pipeline The Pipeline object ** encapsulates all of your data processing tasks **. Apache Beam programs typically first create a Pipeline object to create a PCollection and apply a Transform.

Creating a Pipeline

To use the Apache Beam program, you must first create an instance of the Apache Beam SDK Pipeline (usually inside the main function). Then, when you create the Pipeline, you set the run options.

The following code is an example of creating an instance of Pipeline.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


options = PipelineOptions()  #Execution options
p = beam.Pipeline(options=options)

PipelineOptions settings

You can use PipelineOptions to set the runners that run the pipeline and ** specific options required for the selected runner **. As an example, it may contain information such as the project ID and where the files are stored.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions


options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'  #Designation of runner

p = beam.Pipeline(options=options)

There are two options, one is to set it programmatically and the other is to pass it from a command line argument. An example is described in [Run in Cloud Dataflow](Run in # cloud-dataflow-) below.

Add custom options

You can add ** custom options ** in addition to the standard PipelineOptions. The following example adds an option to specify the input and output paths. Custom options also allow you to specify a description or default value that will be displayed when the user passes --help from the command line argument.

You can create custom options by ** inheriting from PipelineOptions **.

class MyOptions(PipelineOptions):
    """Custom options."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',  #Option name
            default='./input.txt',  #Default value
            help='Input path for the pipeline')  #Description

        parser.add_argument(
            '--output',
            default='./output.txt',
            help='Output path for the pipeline')

Pass the options you created as follows:

p = beam.Pipeline(options=MyOptions())

To set a custom option to a value other than the default value, pass a value from the command line argument as follows:

--input=value --output==value

PCollection A PCollection is a ** object that represents a dataset to be distributed **. In the Apache Beam pipeline, Transform uses PCollection as input and output. So if you want to process the data in your pipeline, you need to create a PCollection.

After creating a Pipeline object, you must first somehow create at least one PCollection.

Creating a PCollection

Use I / O Transform to read data from an external source or create a PCollection from in-memory. The latter is mainly useful for testing and debugging.

Create a PCollection from an external source

Use I / O Transform to create a PCollection from an external source. To read the data, apply the Read Transform provided by each I / O Transform to the Pipeline object.

Here's how to apply a Read Transform to a Pipeline to create a PCollection:

lines = p | 'ReadFromText' >> beam.io.ReadFromText('gs://some/input-data.txt')

Create PCollection from in-memory

Use Create Transform to create a PCollection from in-memory.

lines = (p | 'ReadFromInMemory' >> beam.Create(['To be, or not to be: that is the question: ', 'Whether \'tis nobler in the mind to suffer ', 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ']))

Transform Transform provides a ** general processing framework **. The Transform is applied to each element of the input PCollection.

The Apache Beam SDK provides a variety of Transforms that you can apply to your PCollection. This includes generic ** Core transforms ** such as ParDo and Combine, as well as ** Composite transforms ** that combine one or more Core transforms. Various Transforms are provided, so please refer to here.

Apply Transform

Each Transform in the Apache Beam SDK provides the pipe operator |, so you can apply the Transform by applying that method to the input PCollection.

[Output PCollection] = [Input PCollection] | [Transform]

You can also chain Transforms to create a pipeline as follows:

[Output PCollection] = ([Initial Input PCollection] 
                             | [First Transform]
                             | [Second Transform]
                             | [Third Transform])

This pipeline has the same flow as this implementation example, so the pipeline will have this shape.

image.png

Transform creates a new PCollection without making any changes to the input PCollection. ** Transform does not change the input PCollection. ** PCollection is by definition immutable. Therefore, you can apply multiple Transforms to the same PCollection to branch the PCollection.

[Output PCollection] = [Initial Input PCollection]

[Output PCollection A] = [Output PCollection] | [Transform A]
[Output PCollection B] = [Output PCollection] | [Transform B]

The shape of this pipeline looks like this:

image.png

I/O Transform When you create a pipeline, you often need to read data from an external source, such as a file or database. Similarly, you can output data from the pipeline to an external storage system.

The Apache Beam SDK provides an I / O Transform for the Common Data Storage Types (https://beam.apache.org/documentation/io/built-in/). If you want to read or write unsupported data storage, you need to implement your own I / O Transform.

Data reading

Read Transform transforms read data from an external source into a PCollection. You can use the Read Transform at any time while building the pipeline, but it is generally done first.

lines = pipeline | beam.io.ReadFromText('gs://some/input-data.txt')

Writing data

Write Transform writes the data in the PCollection to an external data source. To print the results of your pipeline, you will most likely use Write Transform at the end of your pipeline.

output | beam.io.WriteToText('gs://some/output-data')

Reading from multiple files

Many Read Transforms support reading from multiple input files that match the glob operator. The following example uses the glob operator (*) to read all matching input files with the prefix "input-" and the suffix ".csv" at the specified location.

lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')

Writing to multiple files

Write Transform writes to multiple files by default. The filename is used as a prefix for all output files.

The following example writes multiple files to one location. Each file is prefixed with "numbers" and suffixed with ".csv".

output | 'WriteToText' >> beam.io.WriteToText('/path/to/numbers', file_name_suffix='.csv')

Pipeline execution

Now let's run the pipeline using Completed Code (#Completed Code). Run it locally and in Cloud Dataflow as the execution environment.

Prepare a text file that contains the following character string for input.

input.txt


good morning.
good afternoon.
good evening.

Run locally

To run the pipeline locally, set PipelineOptions to DirectRunner as the runner, but you don't need to explicitly specify the runner unless you have a specific setting.

Run the following command from the command line. Rewrite the input destination and output destination paths depending on the environment.

python pipeline.py --input=./input.txt --output=./output.txt

This implementation example is a pipeline that counts the number of characters in a word, so the following result is output. Also, by default, beam.io.WriteToText appends the file name with the string 00000-of-00001 to distribute and write to multiple files. If you want to write to one file, you can do so by emptying the shard_name_template argument.

output.txt-00000-of-00001


13
15
13

Run in Cloud Dataflow

Cloud Dataflow is a fully managed service provided by GCP (Google Cloud Platfom) that processes data in stream mode or batch mode. .. Users can process a huge amount of data by using a virtually unlimited capacity on a pay-as-you-go basis without worrying about the operation of infrastructure such as servers.

Running a pipeline in Cloud Dataflow creates a job in your GCP project that uses Compute Engine and Cloud Storage resources. To take advantage of Cloud Dataflow, make ** Dataflow API on ** in GCP.

A small modification is required to run [Completed Code](#Completed Code) in Cloud Dataflow. Modify it as follows.

pipeline.py


import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions


GCP_PROJECT_ID = 'my-project-id'
GCS_BUCKET_NAME = 'gs://my-bucket-name'
JOB_NAME = 'compute-word-length'


class MyOptions(PipelineOptions):
    """Custom options."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',
            default='{}/input.txt'.format(GCS_BUCKET_NAME),  #Input to GCS.Put txt
            help='Input for the pipeline')

        parser.add_argument(
            '--output',
            default='{}/output.txt'.format(GCS_BUCKET_NAME),  #Output to GCS
            help='Output for the pipeline')


class ComputeWordLength(beam.DoFn):
    """Conversion process to find the number of characters."""

    def __init__(self):
        pass

    def process(self, element):
        yield len(element)


def run():
    options = MyOptions()

    #GCP option
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = GCP_PROJECT_ID  #Project ID
    google_cloud_options.job_name = JOB_NAME  #Arbitrary job name
    google_cloud_options.staging_location = '{}/binaries'.format(GCS_BUCKET_NAME)  #GCS path for staging files
    google_cloud_options.temp_location = '{}/temp'.format(GCS_BUCKET_NAME)  #GCS path for temporary files

    #Worker options
    options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'  #Enable autoscaling

    #Standard option
    options.view_as(StandardOptions).runner = 'DataflowRunner'  #Specify Dataflow runner

    p = beam.Pipeline(options=options)

    (p
     | 'ReadFromText' >> beam.io.ReadFromText(options.input)
     | 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())
     | 'WriteToText' >> beam.io.WriteToText(options.output, shard_name_template=""))

    p.run()
    # p.run().wait_until_finish()  #Block until the pipeline is complete


if __name__ == '__main__':
    run()

See here for more Dataflow options (https://cloud.google.com/dataflow/docs/guides/specifying-exec-params?hl=ja#-cloud-dataflow--). The streaming option must be true to stream.

This can also be executed with a similar command.

python pipeline.py --input=gs://my-project-id/input.txt --output=gs://my-project-id/output.txt

Options set in the program can also be passed from command line arguments like this.

python pipeline.py \
  --input=gs://my-project-id/input.txt \
  --output=gs://my-project-id/output.txt \
  --runner=DataflowRunner \
  --project=my-project-id \
  --temp_location=gs://my-project-id/tmp/
  ...

You can monitor your pipeline by accessing the Dataflow service from GCP. The UI looks like this and the result is printed in the specified path.

スクリーンショット 2020-01-03 15.36.16.png

If you want to execute such batch processing of Dataflow regularly, it is convenient to use ** Dataflow template **. For more information, see here.

Pipeline testing

When testing pipelines, ** local unit testing can often save a lot of debugging time and effort ** rather than debugging remote executions such as Dataflow.

You need to install the following to resolve the dependency:

pip install nose

Implementation example

To test the pipeline, use the TestPipeline object. Instead of reading the input from an external source, use ʻapache_beam.Create to create a PCollection from in-memory. Compare the output with ʻassert_that.

test_pipeline.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

from src.pipeline import ComputeWordLength


class PipelineTest(TestCase):

    def test_pipeline(self):
        expected = [
            13,
            15,
            13
        ]

        inputs = [
            'good morning.',
            'good afternoon.',
            'good evening.'
        ]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ParDo(ComputeWordLength()))

            assert_that(actual, equal_to(expected))

Pipeline design

In [above](applying # transform-), we have already briefly explained the design (processing flow) when creating a simple pipeline and a branching pipeline. Here are some other common pipeline designs.

A pipeline with a Transform that produces multiple PCollections

image.png

This can be achieved using Apache Beam's Additional outputs (https://beam.apache.org/documentation/programming-guide/#additional-outputs) feature.

class ExtractWord(beam.DoFn):

   def process(element):
        if element.startswith('A'):
            yield pvalue.TaggedOutput('a', element)  #Give a tag name (starting with'A'If it is an element of'a')
        elif element.startswith('B'):
            yield pvalue.TaggedOutput('b', element)  #Give a tag name (starting with'B'If it is an element of'b')


mixed_col = db_row_col | beam.ParDo(ExtractWord()).with_outputs()

mixed_col.a | beam.ParDo(...)  # .Can be accessed by tag name
mixed_col.b | beam.ParDo(...)

A pipeline with a Transform that joins PCollections

image.png

This can be achieved by using Flatten.

col_list = (a_col, b_col) | beam.Flatten()

Pipeline with multiple input sources

image.png

You can create a PCollection from each input source and join it with CoGroupByKey etc.

user_address = p | beam.io.ReadFromText(...)
user_order = p | beam.io.ReadFromText(...)

joined_col = (user_address, user_order) | beam.CoGroupByKey()

joined_col | beam.ParDo(...)

Other useful functions

You may also want to know the following features so that you can handle various use cases.

Composite transforms Composite transforms are a combination of multiple Transforms (ParDo, Combine, GroupByKey ...). Nesting multiple Transforms makes your code more modular and easier to understand.

Implementation example

To implement Composite transforms, you need to extend the Transform class and override the expand method.

"""A pipeline that counts the number of words in a sentence."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class ComputeWordCount(beam.PTransform):
    """Composite transforms counting the number of words."""

    def __init__(self):
        pass

    def expand(self, pcoll):
        return (pcoll
                | 'SplitWithHalfSpace' >> beam.Map(lambda element: element.split(' '))
                | 'ComputeArraySize' >> beam.Map(lambda element: len(element)))


def run():
    p = beam.Pipeline(options=PipelineOptions())

    inputs = ['There is no time like the present.', 'Time is money.']

    (p
     | 'Create' >> beam.Create(inputs)
     | 'ComputeWordCount' >> ComputeWordCount()
     | 'WriteToText' >> beam.io.WriteToText('Output destination path'))

    p.run()

if __name__ == '__main__':
    run()
    

output


7
3

Side inputs Side inputs is a feature that allows you to pass additional inputs (secondary inputs) to a Transform in addition to the normal inputs (main input) PCollection.

Implementation example

"""A pipeline that outputs only strings with above average number of characters."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import pvalue


class FilterMeanLengthFn(beam.DoFn):
    """Filter strings with above average number of characters."""

    def __init__(self):
        pass

    # mean_word_length is a sub-input
    def process(self, element, mean_word_length):
        if len(element) >= mean_word_length:
            yield element


def run():
    p = beam.Pipeline(options=PipelineOptions())

    inputs = ["good morning.", "good afternoon.", "good evening."]

    #Sub-input
    mean_word_length = word_lengths | 'ComputeMeanWordLength' >> beam.CombineGlobally(beam.combiners.MeanCombineFn())

    #Main input
    output = (p
              | 'Create' >> beam.Create(inputs)
              | 'FilterMeanLength' >> beam.ParDo(FilterMeanLengthFn(), pvalue.AsSingleton(mean_word_length))  #Insert a sub-input in the second argument of ParDo
              | 'write to text' >> beam.io.WriteToText('Output destination path'))

    p.run().wait_until_finish()


if __name__ == '__main__':
    run()

The number of characters of "good morning.", "Good afternoon.", And "good evening." Are "13", "15", and "13" respectively, and the average is about 13.67, so the output is as follows.

output


good afternoon.

What's happening in the pipeline?

It describes a little about "what's happening in the pipeline".

Serialization and communication

One of the most costly operations in distributed pipeline processing is ** serializing and communicating elements between machines **. The Apache Beam runner serializes the elements of the PCollection, for example because it communicates between machines. Communicate elements between the Transform and the Transform in the next step using the following techniques:

  1. Serialize the element and route it to the worker
  2. Serialize the element and redistribute it to multiple workers
  3. When using Side inputs, the element must be serialized and broadcast to all workers
  4. If the Transform and the Transform of the next step are executed by the same worker, the elements are communicated using in-memory (communication cost can be reduced by not serializing).

Bundled and persistent

Apache Beam focuses on the Embarassingly parallel issue. Since Apache Beam attaches great importance to processing elements in parallel, it is not good at expressing actions such as ** assigning sequence numbers to each element of PCollection **. This is because such algorithms are much more likely to have scalability issues.

** Processing all elements in parallel ** also has some drawbacks. For example, when writing an element to the output destination. In output processing, it is not possible to batch process all elements in parallel.

Therefore, the Apache Beam runner does not process all the elements at the same time, but bundles and processes the elements of the PCollection. In the case of streaming processing, it tends to be bundled and processed in small units, and in the case of batch processing, it tends to be bundled and processed in larger units.

Parallel processing

Parallel processing in Transform

When running a single ParDo, the Apache Beam runner may split and bundle the elements of the PCollection in two.

image.png

When the ParDo is executed, the worker processes the two bundles in parallel, as shown below.

image.png

Since a single element cannot be split, the maximum parallelism of a Transform depends on the number of elements in the PCollection. The maximum number of parallel processes in this case is ** 9 ** as seen from the figure.

Parallel processing between Transforms

ParDos can be subordinate parallels. For example, ParDo1 and ParDo2 are dependent parallel if the output of ParDo1 needs to be processed by the same worker as follows:

image.png

Worker1 runs ParDo1 on the elements of Bundle A, which becomes Bundle C. Then ParDo2 is executed on the elements of Bundle C. Similarly, Worker2 runs ParDo1 on the elements of Bundle B, which becomes Bundle D. Then ParDo2 is executed on the elements of Bundle D.

image.png

By performing ParDo in this way, Apache Beam runners can avoid redistributing elements among workers. And this saves communication costs. However, ** the maximum number of parallel processes will now depend on the maximum number of parallel processes for the first ParDo in the dependent parallel. ** **

Behavior when a failure occurs

Behavior in the event of a failure in the Transform

If processing on the elements in the Bundle fails, the entire Bundle will fail. Therefore, the process must be retried (otherwise the entire pipeline will fail).

In the following example, Worker1 successfully handles all five elements of Bundle A. Worker2 handles the four elements of Bundle B, but the first two elements of Bundle B are processed successfully and the third element is unsuccessful.

The Apache Beam runner then retries all the elements of Bundle B, and the second time it completes successfully. As shown, ** retries do not always occur in the same Worker as the original processing attempt. ** **

image.png

Behavior when a failure occurs between Transforms

If the elements in ParDo2 cannot be processed after processing ParDo1, these two Transforms will fail at the same time.

In the following example, Worker2 successfully executes ParDo1 on all elements of Bundle B. However, ParDo2 fails because it cannot handle the elements of Bundle D.

As a result, the Apache Beam runner must discard the ParDo2 output and try again. In that case, the ParDo1 Bundle must also be destroyed, and all elements of the ** Bundle must be retried. ** **

image.png

Summary

I tried to summarize what I learned based on the contents of Apache Beam Documentation. Please point out any mistakes! : bow:

Recommended Posts

Apache Beam (Dataflow) Practical Introduction [Python]
Touch Apache Beam in Python
Apache Beam Cheat Sheet [Python]
Introduction of Python
Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~
Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Combine ~
Introduction of Python
Introduction to Python language
Introduction to OpenCV (python)-(2)
Python Basic Course (Introduction)
Python Beginner's Guide (Introduction)
Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Basic Group By Key ~
Introduction to Python Django (2) Win
Apache mod_auth_tkt and Python AuthTkt
Python3 + Django ~ Mac ~ with Apache
Introduction of activities applying Python
Introduction to serial communication [Python]
macports Apache, Python 3.3 + non-macports mod_wsgi3.4
Design Patterns in Python: Introduction
[Introduction to Python] <list> [edit: 2020/02/22]
Introduction to Python (Python version APG4b)
An introduction to Python Programming
Introduction to Python For, While
Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle