[JAVA] Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Combine ~

Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Combine ~

This section describes the basic usage of Combine, one of the five Core Transforms of Apache Beam. The basic story of other Core Transforms and Apache Beam 2.0.x in the first place is described below.

Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle-Qiita

Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~ --Qiita

Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Basic GroupByKey Edition ~ --Qiita

This article is written with reference to the following two official documents.

Beam Programming Guide

Combine collection and value|Cloud Dataflow documentation| Google Cloud Platform

Two roles of Combine

Combine combines or merges each element (each data) existing in PCollection. I recognize it as something like Reduce in Map / Shuffle / Reduce.

There are two main ways to combine. "How to combine elements existing in one PCollection to generate one value" and "How to combine each element of the Value part of PCollection grouped by Key to generate one value" Is. Below, I would like to describe each method.

How to combine elements that exist in one PCollection to generate one value

How to combine elements existing in one PCollection to generate one value

Combine each element in the PCollection. => It should be noted that this is different from ParDo. ParDo does some processing for each element in the PCollection. Combine combines each element in the PCollection.

For example, this is the case where elements existing in the PCollection are combined to generate one value.


PCollection<Integer> sum = pCollection.apply(Sum.integersGlobally());

At first glance, Combine doesn't seem to exist, but Sum.integersGlobally () wraps Combine.globally. The actual Sum.integersGlobally () is below.


public static Combine.Globally<Integer, Integer> integersGlobally() {
  return Combine.globally(Sum.ofIntegers());}

reference API Reference

withoutDefaults() If you want to return empty when an empty PCollection is given as inout, add withoutDefaults ().


PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());

Differences in behavior between Global Window and non-Global Window

In the case of Global Window, the default behavior is to return a PCollection containing one item.

On the other hand, in the case of non-Global Window, the above default operation is not performed. Specify Option when using Combine. The formula was easy to understand, so I quote it below. (At the time of writing this post, the Document for Apache Beam 2.0.x did not yet have this description. Google Cloud Dataflow 1.9 is quoted from the official documentation)

Specify> .withoutDefaults. In this case, the empty window in the input PCollection will also be empty in Output> Collection.

Specify> .asSingletonView. In this case, the output is immediately converted to a PCollectionView. This is the default value when each empty window is used as a secondary input. In general, this option should only be used if the result of the pipeline's Combine is later used as a secondary input in the pipeline.

Quote source: Combine collection and value|Cloud Dataflow documentation| Google Cloud Platform

I actually wrote the code

Each process is described as a comment in the code. I don't use method chains as much as possible to prioritize understanding. Therefore, the code is redundant.

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;

/**
 *Main class
 */
public class Main {
    /**
     *Functional object
     * String =>Type conversion of Integer
     */
    static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //String the element=>Convert to Integer and output
            c.output(Integer.parseInt(c.element()));
        }
    }

    /**
     *Functional object
     * Integer =>Perform type conversion of String
     */
    static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //String the element=>Convert to Integer and output
            System.out.println(c.element());
            c.output(String.valueOf(c.element()));
        }
    }


    /**
     *Input data path
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Out data path
     */
    private static final String OUTPUT_FILE_PATH = "./result.txt";

    /**
     *Do not use method chains as much as possible for understanding
     *Therefore, there are redundant parts.
     *Main method
     *
     * @param args
     */
    public static void main(String[] args) {
        //Generate Pipeline by specifying option
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        System.out.println("a");
        //Read from file
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
        //String each read data=>Convert to Integer
        PCollection<Integer> integerPCollection = lines.apply(ParDo.of(new TransformTypeFromStringToInteger()));
        // Combine.Sum up each element of PCollection in Globally
        //For an empty PCollection, if you want to return empty=> PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
        PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
        // PCollection<Integer>sum Integer=>Convert to String
        PCollection<String> sumString = sum.apply(ParDo.of(new TransformTypeFromIntegerToString()));
        //Write to file
        sumString.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        //Run
        pipeline.run().waitUntilFinish();
    }
}

I wrote the code for implementation (ver using method chain)

It was pretty clean

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;


/**
 *Main class
 */
public class Main {
    /**
     *Functional object
     * String =>Type conversion of Integer
     */
    static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //String the element=>Convert to Integer and output
            c.output(Integer.parseInt(c.element()));
        }
    }

    /**
     *Functional object
     * Integer =>Perform type conversion of String
     */
    static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //String the element=>Convert to Integer and output
            System.out.println(c.element());
            c.output(String.valueOf(c.element()));
        }
    }


    /**
     *Input data path
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Out data path
     */
    private static final String OUTPUT_FILE_PATH = "./result.txt";

    /**
     *Main method
     *
     * @param args
     */
    public static void main(String[] args) {
        //Pipeline generation
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        //Processing part
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new TransformTypeFromStringToInteger()))
                .apply(Sum.integersGlobally().withoutDefaults())
                .apply(ParDo.of(new TransformTypeFromIntegerToString()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        //Run
        pipeline.run().waitUntilFinish();
    }
}

Read file

samole.txt


1
2
3
4
5
6
7
8
9
10

Execution result

result.txt-00000-of-00001 is output The contents of result.txt-00000-of-00001

55

What you are doing

10
Σk
k = 1

It's like that.

PerKey When GroupByKey is performed, it becomes K, V (Iterable Collection). For example:

Java [1, 2, 3]

Combine PerKey combines the V [Iterable Collection] part of this K, V [Iterable Collection] for each Key. So, for example, if you combine K and V (Iterable Collection) after GroupByKey above, it will be as follows.

Java [6]

All the elements of V (Iterable Collection) of K and V (Iterable Collection) are combined.

I actually wrote the code

Each process is described as a comment in the code. I don't use method chains as much as possible to prioritize understanding. Therefore, the code is redundant.

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
 *Main
 */
public class Main {
    /**
     *Function object
     *Given String str,String num","Divide by
     *Change num to Integer type and KV<String, Integer>Make a mold
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext is an object that represents input
        //Beam SDK will pick it up for you without having to define it yourself
        public void processElement(ProcessContext c) {
            // ","Split with
            String[] words = c.element().split(",");
            //Split word[0]To K, words[1]To Integer to V
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }


    /**
     *Function object
     * KV<String, Iterable<Integer>Change type to String type
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Convert input to String type
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Input data path
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Out data path
     */
    private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";

    /**
     *Main
     *For the sake of understanding, I do not use the method chain as much as possible
     *Therefore, the code is redundant.
     *
     * @param args argument
     */
    public static void main(String[] args) {
        //First, create an Option to set in Pipeline
        //This time, we will start it locally, so specify DirectRunner.
        //In local mode, DirectRunner is already the default, so you don't need to configure a runner
        PipelineOptions options = PipelineOptionsFactory.create();

        //Generate Pipeline based on Option
        Pipeline pipeline = Pipeline.create(options);

        //Read inout data and PCollection from there(A set of data in the pipeline)To create
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        //Given String str,String num","Divide with, change num to Integer type, KV<String, Integer>Make a mold
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        //Combine PerKey performs a GroupByKey conversion as part of the operation
        PCollection<KV<String, Integer>> sumPerKey = kvCounter
                .apply(Sum.integersPerKey());
        
        //Convert PCollection to a file outputable form
        PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        //Write
        output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));

        // run :Run with Runner specified by PipeLine option
        // waitUntilFinish :Wait for PipeLine to finish and return final state
        pipeline.run().waitUntilFinish();
    }


}

I wrote the code for implementation (ver using method chain)

It was pretty clean

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;

/**
 *Main
 */
public class Main {
    /**
     *Function object
     *Given String str,String num","Divide by
     *Change num to Integer type and KV<String, Integer>Make a mold
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext is an object that represents input
        //Beam SDK will pick it up for you without having to define it yourself
        public void processElement(ProcessContext c) {
            // ","Split with
            String[] words = c.element().split(",");
            //Split word[0]To K, words[1]To Integer to V
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }


    /**
     *Function object
     * KV<String, Iterable<Integer>Change type to String type
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Convert input to String type
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Input data path
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Out data path
     */
    private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";

    /**
     *Main
     * @param args argument
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
        pipeline
                .apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(Sum.integersPerKey())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
        pipeline.run().waitUntilFinish();
    }


}

Read file

samole.txt


Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

Execution result

The following three files are generated. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003

The contents of each file are as follows. Since processing is performed in distributed parallel processing, which content is output to which file is random each time.

result.csv-00000-of-00003

KV{Python, 13}

result.csv-00001-of-00003

KV{Java, 6}

result.csv-00002-of-00003

KV{Go, 17}

Related article

Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle-Qiita

Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~ --Qiita

Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Basic GroupByKey Edition ~ --Qiita

The site that I used as a reference

Beam Programming Guide Combine collection and value|Cloud Dataflow documentation| Google Cloud Platform API Reference

Recommended Posts

Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Combine ~
Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Basic Group By Key ~
Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle
Apache Beam (Dataflow) Practical Introduction [Python]
Materials to read when getting started with Apache Beam
Upload and delete files to Google Cloud Storages with django-storage
Introduction to RDB with sqlalchemy Ⅰ
What is Google Cloud Dataflow?
Series: Introduction to cx_Oracle Contents
Changes to run "Using Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." with SDK 2.1
How to connect to Cloud Firestore from Google Cloud Functions with python code