[JAVA] Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Basic Group By Key ~

Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Basic Group By Key ~

This section describes the basic usage of GroupByKey, one of the five Core Transforms of Apache Beam. I wish I could write about CoGroupByKey etc. at another time.

For the basics of Apache Beam and Cloud Dataflow here

I wrote it with reference to Official Beam Programming Guide.

What is GroupByKey?

Parallel reduction operations. Shuffle in Map / Shuffle / Reduce-style. GroupByKey, as the name implies, is a Core Transform that "groups a collection by Key". Create a new Collection by combining key-value collections that have the same Key but different values. Useful for aggregating data that has a common Key.

multimap and uni-map

multimap For example, suppose you have the keys Java, Python, Go. Value is assigned to each of the plurality of Keys by a number. This Map before conversion is called multimap.

Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

uni-map Applying GroupByKey to the above Key-Value multimap Collection gives the following results.

Java [1, 6, 8]
Python [2, 7]
Go[7, 8]

After conversion, this Map is called uni-map. A map of a collection of numbers is assigned to the unique Java, Python, and Go keys.

How to represent Key-Value peculiar to Beam SDK for Java

In Beam SDK for Java, the key-value is expressed differently from normal Java. Represents a key-value object of type KV <K, V>.

I actually wrote the code

File to read

sample.txt


Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

Actual Java code

Each process is described as a comment in the code. I don't use method chains as much as possible to prioritize understanding. Therefore, the code is redundant.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


/**
 *Main
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     *Function object
     *Given String str,String num","Divide by
     *Change num to Integer type and KV<String, Integer>Make a mold
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext is an object that represents input
        //Beam SDK will pick it up for you without having to define it yourself
        public void processElement(ProcessContext c) {
            // ","Split with
            String[] words = c.element().split(",");
            //Split word[0]To K, words[1]To Integer to V
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     *Function object
     * KV<String, Iterable<Integer>Change type to String type
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Convert input to String type
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Input data path
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Out data path
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     *Main
     *For the sake of understanding, I do not use the method chain as much as possible
     *Therefore, the code is redundant.
     *
     * @param args argument
     */
    public static void main(String[] args) {
        //First, create an Option to set in Pipeline
        //This time, we will start it locally, so specify DirectRunner.
        //In local mode, DirectRunner is already the default, so you don't need to configure a runner
        PipelineOptions options = PipelineOptionsFactory.create();

        //Generate Pipeline based on Option
        Pipeline pipeline = Pipeline.create(options);

        //Read inout data and PCollection from there(A set of data in the pipeline)To create
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        //Given String str,String num","Divide with, change num to Integer type, KV<String, Integer>Make a mold
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        //With GroupByKey,{Go, [2, 9, 1, 5]}Shape like
               // GroupByKey.<K, V>create())With GroupByKey<K, V>Is generating
        PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
                GroupByKey.<String, Integer>create());

        //For output<KV<String, Iterable<Integer>>>Converting from type to String type
        PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        //Write
        output.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run :Run with Runner specified by PipeLine option
        // waitUntilFinish :Wait for PipeLine to finish and return final state
        pipeline.run().waitUntilFinish();
    }
}

By the way, using the method chain looks like this. It was pretty neat.


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;


/**
 *Main
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     *Function object
     *Given String str,String num","Divide by
     *Change num to Integer type and KV<String, Integer>Make a mold
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext is an object that represents input
        //Beam SDK will pick it up for you without having to define it yourself
        public void processElement(ProcessContext c) {
            // ","Split with
            String[] words = c.element().split(",");
            //Split word[0]To K, words[1]To Integer to V
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     *Function object
     * KV<String, Iterable<Integer>Change type to String type
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Convert input to String type
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Input data path
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Out data path
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     *Main
     *For the sake of understanding, I do not use the method chain as much as possible
     *Therefore, the code is redundant.
     *
     * @param args argument
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        //How to write using method chain
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(GroupByKey.<String, Integer>create())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run :Run with Runner specified by PipeLine option
        // waitUntilFinish :Wait for PipeLine to finish and return final state
        pipeline.run().waitUntilFinish();
    }
}

Execution result

The following three files are generated. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003

The contents of each file are as follows. Since the processing is done in distributed parallel processing, there are files with blank contents and files with one or two contents. Also, which content is output to which file is random each time.

result.csv-00000-of-00003 No contents

result.csv-00001-of-00003

KV{Java, [1, 3, 2]}

result.csv-00002-of-00003

KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}

Related article

Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~ --Qiita

Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle-Qiita

The site that I used as a reference

Beam Programming Guide

Combine with GroupByKey|Cloud Dataflow documentation|  Google Cloud Platform

Recommended Posts

Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Basic Group By Key ~
Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Combine ~
Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~
Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle
Apache Beam (Dataflow) Practical Introduction [Python]