This section describes the basic usage of GroupByKey, one of the five Core Transforms of Apache Beam. I wish I could write about CoGroupByKey etc. at another time.
For the basics of Apache Beam and Cloud Dataflow here
I wrote it with reference to Official Beam Programming Guide.
Parallel reduction operations. Shuffle in Map / Shuffle / Reduce-style. GroupByKey, as the name implies, is a Core Transform that "groups a collection by Key". Create a new Collection by combining key-value collections that have the same Key but different values. Useful for aggregating data that has a common Key.
multimap For example, suppose you have the keys Java, Python, Go. Value is assigned to each of the plurality of Keys by a number. This Map before conversion is called multimap.
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
uni-map Applying GroupByKey to the above Key-Value multimap Collection gives the following results.
Java [1, 6, 8]
Python [2, 7]
Go[7, 8]
After conversion, this Map is called uni-map. A map of a collection of numbers is assigned to the unique Java, Python, and Go keys.
In Beam SDK for Java, the key-value is expressed differently from normal Java. Represents a key-value object of type KV <K, V>.
sample.txt
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
Each process is described as a comment in the code. I don't use method chains as much as possible to prioritize understanding. Therefore, the code is redundant.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
*Main
* Created by sekiguchikai on 2017/07/12.
*/
public class Main {
/**
*Function object
*Given String str,String num","Divide by
*Change num to Integer type and KV<String, Integer>Make a mold
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext is an object that represents input
//Beam SDK will pick it up for you without having to define it yourself
public void processElement(ProcessContext c) {
// ","Split with
String[] words = c.element().split(",");
//Split word[0]To K, words[1]To Integer to V
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Function object
* KV<String, Iterable<Integer>Change type to String type
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Convert input to String type
c.output(String.valueOf(c.element()));
}
}
/**
*Input data path
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Out data path
*/
private static final String OUTPUT_FILE_PATH = "./result.csv";
/**
*Main
*For the sake of understanding, I do not use the method chain as much as possible
*Therefore, the code is redundant.
*
* @param args argument
*/
public static void main(String[] args) {
//First, create an Option to set in Pipeline
//This time, we will start it locally, so specify DirectRunner.
//In local mode, DirectRunner is already the default, so you don't need to configure a runner
PipelineOptions options = PipelineOptionsFactory.create();
//Generate Pipeline based on Option
Pipeline pipeline = Pipeline.create(options);
//Read inout data and PCollection from there(A set of data in the pipeline)To create
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//Given String str,String num","Divide with, change num to Integer type, KV<String, Integer>Make a mold
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
//With GroupByKey,{Go, [2, 9, 1, 5]}Shape like
// GroupByKey.<K, V>create())With GroupByKey<K, V>Is generating
PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
GroupByKey.<String, Integer>create());
//For output<KV<String, Iterable<Integer>>>Converting from type to String type
PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
//Write
output.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run :Run with Runner specified by PipeLine option
// waitUntilFinish :Wait for PipeLine to finish and return final state
pipeline.run().waitUntilFinish();
}
}
By the way, using the method chain looks like this. It was pretty neat.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
/**
*Main
* Created by sekiguchikai on 2017/07/12.
*/
public class Main {
/**
*Function object
*Given String str,String num","Divide by
*Change num to Integer type and KV<String, Integer>Make a mold
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext is an object that represents input
//Beam SDK will pick it up for you without having to define it yourself
public void processElement(ProcessContext c) {
// ","Split with
String[] words = c.element().split(",");
//Split word[0]To K, words[1]To Integer to V
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Function object
* KV<String, Iterable<Integer>Change type to String type
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Convert input to String type
c.output(String.valueOf(c.element()));
}
}
/**
*Input data path
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Out data path
*/
private static final String OUTPUT_FILE_PATH = "./result.csv";
/**
*Main
*For the sake of understanding, I do not use the method chain as much as possible
*Therefore, the code is redundant.
*
* @param args argument
*/
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
//How to write using method chain
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new SplitWordsAndMakeKVFn()))
.apply(GroupByKey.<String, Integer>create())
.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run :Run with Runner specified by PipeLine option
// waitUntilFinish :Wait for PipeLine to finish and return final state
pipeline.run().waitUntilFinish();
}
}
The following three files are generated. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003
The contents of each file are as follows. Since the processing is done in distributed parallel processing, there are files with blank contents and files with one or two contents. Also, which content is output to which file is random each time.
result.csv-00000-of-00003 No contents
result.csv-00001-of-00003
KV{Java, [1, 3, 2]}
result.csv-00002-of-00003
KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}
Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~ --Qiita
Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle-Qiita
Combine with GroupByKey|Cloud Dataflow documentation| Google Cloud Platform
Recommended Posts