This section describes the basic usage of Combine, one of the five Core Transforms of Apache Beam. The basic story of other Core Transforms and Apache Beam 2.0.x in the first place is described below.
Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle-Qiita
Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~ --Qiita
This article is written with reference to the following two official documents.
Combine collection and value|Cloud Dataflow documentation| Google Cloud Platform
Combine combines or merges each element (each data) existing in PCollection. I recognize it as something like Reduce in Map / Shuffle / Reduce.
There are two main ways to combine. "How to combine elements existing in one PCollection to generate one value" and "How to combine each element of the Value part of PCollection grouped by Key to generate one value" Is. Below, I would like to describe each method.
Combine each element in the PCollection. => It should be noted that this is different from ParDo. ParDo does some processing for each element in the PCollection. Combine combines each element in the PCollection.
For example, this is the case where elements existing in the PCollection are combined to generate one value.
PCollection<Integer> sum = pCollection.apply(Sum.integersGlobally());
At first glance, Combine doesn't seem to exist, but Sum.integersGlobally () wraps Combine.globally. The actual Sum.integersGlobally () is below.
public static Combine.Globally<Integer, Integer> integersGlobally() {
return Combine.globally(Sum.ofIntegers());}
reference API Reference
withoutDefaults() If you want to return empty when an empty PCollection is given as inout, add withoutDefaults ().
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
In the case of Global Window, the default behavior is to return a PCollection containing one item.
On the other hand, in the case of non-Global Window, the above default operation is not performed. Specify Option when using Combine. The formula was easy to understand, so I quote it below. (At the time of writing this post, the Document for Apache Beam 2.0.x did not yet have this description. Google Cloud Dataflow 1.9 is quoted from the official documentation)
Specify> .withoutDefaults. In this case, the empty window in the input PCollection will also be empty in Output> Collection.
Specify> .asSingletonView. In this case, the output is immediately converted to a PCollectionView. This is the default value when each empty window is used as a secondary input. In general, this option should only be used if the result of the pipeline's Combine is later used as a secondary input in the pipeline.
Quote source: Combine collection and value|Cloud Dataflow documentation| Google Cloud Platform
Each process is described as a comment in the code. I don't use method chains as much as possible to prioritize understanding. Therefore, the code is redundant.
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
/**
*Main class
*/
public class Main {
/**
*Functional object
* String =>Type conversion of Integer
*/
static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
//String the element=>Convert to Integer and output
c.output(Integer.parseInt(c.element()));
}
}
/**
*Functional object
* Integer =>Perform type conversion of String
*/
static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//String the element=>Convert to Integer and output
System.out.println(c.element());
c.output(String.valueOf(c.element()));
}
}
/**
*Input data path
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Out data path
*/
private static final String OUTPUT_FILE_PATH = "./result.txt";
/**
*Do not use method chains as much as possible for understanding
*Therefore, there are redundant parts.
*Main method
*
* @param args
*/
public static void main(String[] args) {
//Generate Pipeline by specifying option
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
System.out.println("a");
//Read from file
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//String each read data=>Convert to Integer
PCollection<Integer> integerPCollection = lines.apply(ParDo.of(new TransformTypeFromStringToInteger()));
// Combine.Sum up each element of PCollection in Globally
//For an empty PCollection, if you want to return empty=> PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
// PCollection<Integer>sum Integer=>Convert to String
PCollection<String> sumString = sum.apply(ParDo.of(new TransformTypeFromIntegerToString()));
//Write to file
sumString.apply(TextIO.write().to(OUTPUT_FILE_PATH));
//Run
pipeline.run().waitUntilFinish();
}
}
It was pretty clean
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
/**
*Main class
*/
public class Main {
/**
*Functional object
* String =>Type conversion of Integer
*/
static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
//String the element=>Convert to Integer and output
c.output(Integer.parseInt(c.element()));
}
}
/**
*Functional object
* Integer =>Perform type conversion of String
*/
static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//String the element=>Convert to Integer and output
System.out.println(c.element());
c.output(String.valueOf(c.element()));
}
}
/**
*Input data path
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Out data path
*/
private static final String OUTPUT_FILE_PATH = "./result.txt";
/**
*Main method
*
* @param args
*/
public static void main(String[] args) {
//Pipeline generation
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
//Processing part
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new TransformTypeFromStringToInteger()))
.apply(Sum.integersGlobally().withoutDefaults())
.apply(ParDo.of(new TransformTypeFromIntegerToString()))
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
//Run
pipeline.run().waitUntilFinish();
}
}
samole.txt
1
2
3
4
5
6
7
8
9
10
result.txt-00000-of-00001 is output The contents of result.txt-00000-of-00001
55
What you are doing
10
Σk
k = 1
It's like that.
PerKey When GroupByKey is performed, it becomes K, V (Iterable Collection). For example:
Java [1, 2, 3]
Combine PerKey combines the V [Iterable Collection] part of this K, V [Iterable Collection] for each Key. So, for example, if you combine K and V (Iterable Collection) after GroupByKey above, it will be as follows.
Java [6]
All the elements of V (Iterable Collection) of K and V (Iterable Collection) are combined.
Each process is described as a comment in the code. I don't use method chains as much as possible to prioritize understanding. Therefore, the code is redundant.
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
*Main
*/
public class Main {
/**
*Function object
*Given String str,String num","Divide by
*Change num to Integer type and KV<String, Integer>Make a mold
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext is an object that represents input
//Beam SDK will pick it up for you without having to define it yourself
public void processElement(ProcessContext c) {
// ","Split with
String[] words = c.element().split(",");
//Split word[0]To K, words[1]To Integer to V
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Function object
* KV<String, Iterable<Integer>Change type to String type
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Convert input to String type
c.output(String.valueOf(c.element()));
}
}
/**
*Input data path
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Out data path
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
*Main
*For the sake of understanding, I do not use the method chain as much as possible
*Therefore, the code is redundant.
*
* @param args argument
*/
public static void main(String[] args) {
//First, create an Option to set in Pipeline
//This time, we will start it locally, so specify DirectRunner.
//In local mode, DirectRunner is already the default, so you don't need to configure a runner
PipelineOptions options = PipelineOptionsFactory.create();
//Generate Pipeline based on Option
Pipeline pipeline = Pipeline.create(options);
//Read inout data and PCollection from there(A set of data in the pipeline)To create
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//Given String str,String num","Divide with, change num to Integer type, KV<String, Integer>Make a mold
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
//Combine PerKey performs a GroupByKey conversion as part of the operation
PCollection<KV<String, Integer>> sumPerKey = kvCounter
.apply(Sum.integersPerKey());
//Convert PCollection to a file outputable form
PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
//Write
output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
// run :Run with Runner specified by PipeLine option
// waitUntilFinish :Wait for PipeLine to finish and return final state
pipeline.run().waitUntilFinish();
}
}
It was pretty clean
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
/**
*Main
*/
public class Main {
/**
*Function object
*Given String str,String num","Divide by
*Change num to Integer type and KV<String, Integer>Make a mold
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext is an object that represents input
//Beam SDK will pick it up for you without having to define it yourself
public void processElement(ProcessContext c) {
// ","Split with
String[] words = c.element().split(",");
//Split word[0]To K, words[1]To Integer to V
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Function object
* KV<String, Iterable<Integer>Change type to String type
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Convert input to String type
c.output(String.valueOf(c.element()));
}
}
/**
*Input data path
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Out data path
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
*Main
* @param args argument
*/
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
pipeline
.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new SplitWordsAndMakeKVFn()))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
pipeline.run().waitUntilFinish();
}
}
samole.txt
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
The following three files are generated. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003
The contents of each file are as follows. Since processing is performed in distributed parallel processing, which content is output to which file is random each time.
result.csv-00000-of-00003
KV{Python, 13}
result.csv-00001-of-00003
KV{Java, 6}
result.csv-00002-of-00003
KV{Go, 17}
Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle-Qiita
Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~ --Qiita
Beam Programming Guide Combine collection and value|Cloud Dataflow documentation| Google Cloud Platform API Reference
Recommended Posts