Cette section décrit l'utilisation de base de Combine, l'une des cinq principales transformations d'Apache Beam. L'histoire de base des autres Core Transforms et Apache Beam 2.0.x en premier lieu est décrite ci-dessous.
Apache Beam 2.0.x avec Google Cloud Dataflow - Qiita commençant par IntelliJ et Gradle
Cet article est rédigé en référence aux deux documents officiels suivants.
Combinez collection et valeur|Documentation Cloud Dataflow| Google Cloud Platform
Combine combine ou fusionne chaque élément (chaque donnée) existant dans la PCollection. Je reconnais que c'est comme Réduire dans Map / Shuffle / Réduire.
Il existe deux manières principales de combiner. "Comment combiner des éléments existant dans une PCollection pour générer une valeur" et "Comment combiner chaque élément de la partie Value de PCollection groupé par clé pour générer une valeur" Est. Ci-dessous, je voudrais décrire chaque méthode.
Combinez chaque élément de la PCollection. => Il convient de noter que c'est différent de ParDo. ParDo effectue un certain traitement pour chaque élément de la PCollection. Combine combine chaque élément de la PCollection.
Par exemple, c'est le cas où les éléments existant dans la PCollection sont combinés pour générer une valeur.
PCollection<Integer> sum = pCollection.apply(Sum.integersGlobally());
À première vue, Combine ne semble pas exister, mais Sum.integersGlobally () enveloppe Combine.globally. Le Sum.integersGlobally () réel est ci-dessous.
public static Combine.Globally<Integer, Integer> integersGlobally() {
return Combine.globally(Sum.ofIntegers());}
référence Référence API (https://beam.apache.org/documentation/sdks/javadoc/2.0.0/index.html?org/apache/beam/sdk/transforms/Combine.html)
withoutDefaults() Si vous voulez retourner vide quand une PCollection vide est donnée comme inout, ajoutez withoutDefaults ().
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
Dans le cas de Global Window, le comportement par défaut est de renvoyer une PCollection contenant un élément.
En revanche, dans le cas d'une fenêtre non globale, l'opération par défaut ci-dessus n'est pas effectuée. Spécifiez l'option lors de l'utilisation de la combinaison. La formule était facile à comprendre, alors je la cite ci-dessous. (Au moment de la rédaction de cet article, le document pour Apache Beam 2.0.x n'avait pas encore cette description. Google Cloud Dataflow 1.9 est cité dans le document officiel)
Spécifiez> .withoutDefaults. Dans ce cas, la fenêtre vide de l'entrée PCollection sera également vide dans Output> Collection.
Spécifiez> .asSingletonView. Dans ce cas, la sortie est immédiatement convertie en PCollectionView. Il s'agit de la valeur par défaut lorsque chaque fenêtre vide est utilisée comme entrée secondaire. En général, cette option ne doit être utilisée que si le résultat de la combinaison du pipeline est utilisé ultérieurement comme entrée secondaire dans le pipeline.
Source du devis: Combinez collection et valeur|Documentation Cloud Dataflow| Google Cloud Platform
Chaque processus est décrit comme un commentaire dans le code. Je n'utilise pas autant que possible la chaîne de méthodes pour prioriser la compréhension. Par conséquent, le code est redondant.
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
/**
*Classe principale
*/
public class Main {
/**
*Objet fonctionnel
* String =>Effectuer une conversion de type entier
*/
static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
//Chaîne l'élément=>Convertir en entier et sortie
c.output(Integer.parseInt(c.element()));
}
}
/**
*Objet fonctionnel
* Integer =>Effectuer la conversion de type de chaîne
*/
static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Chaîne l'élément=>Convertir en entier et sortie
System.out.println(c.element());
c.output(String.valueOf(c.element()));
}
}
/**
*Chemin des données d'entrée
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Chemin de données de sortie
*/
private static final String OUTPUT_FILE_PATH = "./result.txt";
/**
*N'utilisez pas autant que possible les chaînes de méthodes pour comprendre
*Par conséquent, il existe des pièces redondantes
*Méthode principale
*
* @param args
*/
public static void main(String[] args) {
//Générer un pipeline en spécifiant une option
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
System.out.println("a");
//Lire à partir d'un fichier
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//Chaîne chaque donnée lue=>Convertir en entier
PCollection<Integer> integerPCollection = lines.apply(ParDo.of(new TransformTypeFromStringToInteger()));
// Combine.Résumez chaque élément de PCollection dans le monde
//Pour une PCollection vide, si vous voulez retourner vide=> PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
// PCollection<Integer>somme Entier=>Convertir en chaîne
PCollection<String> sumString = sum.apply(ParDo.of(new TransformTypeFromIntegerToString()));
//Écrire dans un fichier
sumString.apply(TextIO.write().to(OUTPUT_FILE_PATH));
//Courir
pipeline.run().waitUntilFinish();
}
}
C'était assez propre
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
/**
*Classe principale
*/
public class Main {
/**
*Objet fonctionnel
* String =>Effectuer une conversion de type entier
*/
static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
//Chaîne l'élément=>Convertir en entier et sortie
c.output(Integer.parseInt(c.element()));
}
}
/**
*Objet fonctionnel
* Integer =>Effectuer la conversion de type de chaîne
*/
static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Chaîne l'élément=>Convertir en entier et sortie
System.out.println(c.element());
c.output(String.valueOf(c.element()));
}
}
/**
*Chemin des données d'entrée
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Chemin de données de sortie
*/
private static final String OUTPUT_FILE_PATH = "./result.txt";
/**
*Méthode principale
*
* @param args
*/
public static void main(String[] args) {
//Génération de pipeline
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
//Partie de traitement
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new TransformTypeFromStringToInteger()))
.apply(Sum.integersGlobally().withoutDefaults())
.apply(ParDo.of(new TransformTypeFromIntegerToString()))
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
//Courir
pipeline.run().waitUntilFinish();
}
}
samole.txt
1
2
3
4
5
6
7
8
9
10
result.txt-00000-of-00001 est sorti Le contenu de result.txt-00000-of-00001
55
Que fais tu
10
Σk
k = 1
C'est comme ça.
PerKey Lorsque GroupByKey est exécuté, il devient K, V (Iterable Collection). Par exemple:
Java [1, 2, 3]
La PerKey de Combine combine la partie V [Collection Iterable] de cette K, V [Collection Iterable] pour chaque Clé. Ainsi, par exemple, si vous combinez K et V (Iterable Collection) après GroupByKey ci-dessus, ce sera comme suit.
Java [6]
Tous les éléments de V (Collection Iterable) de K et V (Collection Iterable) sont combinés.
Chaque processus est décrit comme un commentaire dans le code. Je n'utilise pas autant que possible la chaîne de méthodes pour prioriser la compréhension. Par conséquent, le code est redondant.
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
*Principale
*/
public class Main {
/**
*Objet de fonction
*Donnée String str,Numéro de chaîne","Diviser par
*Changer num en type entier et KV<String, Integer>Faire un moule
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext est un objet qui représente l'entrée
//Beam SDK le récupérera pour vous sans avoir à le définir vous-même
public void processElement(ProcessContext c) {
// ","Split avec
String[] words = c.element().split(",");
//Mot divisé[0]À K, mots[1]Vers un entier vers V
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Objet de fonction
* KV<String, Iterable<Integer>Changer le type en type String
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Convertir l'entrée en type String
c.output(String.valueOf(c.element()));
}
}
/**
*Chemin des données d'entrée
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Chemin de données de sortie
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
*Principale
*Par souci de compréhension, je n'utilise pas autant que possible la chaîne de méthodes
*Par conséquent, le code est redondant.
*
* @argument param args
*/
public static void main(String[] args) {
//Commencez par créer une option à définir dans Pipeline
//Cette fois, puisqu'il sera démarré localement, spécifiez DirectRunner.
//En mode local, DirectRunner est déjà la valeur par défaut, vous n'avez donc pas besoin de configurer un coureur
PipelineOptions options = PipelineOptionsFactory.create();
//Générer un pipeline en fonction de l'option
Pipeline pipeline = Pipeline.create(options);
//Lire les données d'entrée et PCollection à partir de là(Un ensemble de données dans le pipeline)Créer
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
//Donnée String str,Numéro de chaîne","Diviser avec, changer num en type entier, KV<String, Integer>Faire un moule
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
//Combine PerKey effectue une conversion GroupByKey dans le cadre de l'opération
PCollection<KV<String, Integer>> sumPerKey = kvCounter
.apply(Sum.integersPerKey());
//Convertir PCollection en un formulaire pouvant être généré dans un fichier
PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
//Écrire
output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
// run :Exécuter avec Runner spécifié par l'option PipeLine
// waitUntilFinish :Attendez que PipeLine se termine et retourne l'état final
pipeline.run().waitUntilFinish();
}
}
C'était assez propre
package com.company;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
/**
*Principale
*/
public class Main {
/**
*Objet de fonction
*Donnée String str,Numéro de chaîne","Diviser par
*Changer num en type entier et KV<String, Integer>Faire un moule
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
//ProcessContext est un objet qui représente l'entrée
//Beam SDK le récupérera pour vous sans avoir à le définir vous-même
public void processElement(ProcessContext c) {
// ","Split avec
String[] words = c.element().split(",");
//Mot divisé[0]À K, mots[1]Vers un entier vers V
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
*Objet de fonction
* KV<String, Iterable<Integer>Changer le type en type String
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
//Convertir l'entrée en type String
c.output(String.valueOf(c.element()));
}
}
/**
*Chemin des données d'entrée
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
*Chemin de données de sortie
*/
private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";
/**
*Principale
* @argument param args
*/
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
pipeline
.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new SplitWordsAndMakeKVFn()))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
pipeline.run().waitUntilFinish();
}
}
samole.txt
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
Les trois fichiers suivants sont générés. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003
Le contenu de chaque fichier est le suivant. Puisque le traitement est effectué par un traitement parallèle distribué, quel contenu est émis vers quel fichier est aléatoire à chaque fois.
result.csv-00000-of-00003
KV{Python, 13}
result.csv-00001-of-00003
KV{Java, 6}
result.csv-00002-of-00003
KV{Go, 17}
Apache Beam 2.0.x avec Google Cloud Dataflow - Qiita commençant par IntelliJ et Gradle
Beam Programming Guide Combinez collection et valeur|Documentation Cloud Dataflow| Google Cloud Platform Référence API (https://beam.apache.org/documentation/sdks/javadoc/2.0.0/index.html?org/apache/beam/sdk/transforms/Combine.html)
Recommended Posts