[JAVA] Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Basic Group By Key ~

Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Basic Group By Key ~

Cette section décrit l'utilisation de base de GroupByKey, l'une des cinq principales transformations d'Apache Beam. J'aimerais pouvoir écrire sur CoGroupByKey etc. à un autre moment.

Pour connaître les bases d'Apache Beam et de Cloud Dataflow, cliquez ici (http://qiita.com/Sekky0905/items/381ed27fca9a16f8ef07)

Je l'ai écrit en référence au Official Beam Programming Guide.

Qu'est-ce que GroupByKey?

Opérations de réduction parallèles. Mélanger dans le style Map / Shuffle / Réduire GroupByKey, comme son nom l'indique, est une transformation principale qui «regroupe une collection par clé». Créez une nouvelle collection en combinant la collection clé-valeur avec plusieurs paires avec la même clé mais des valeurs différentes. Utile pour agréger des données ayant une clé commune.

multimap et uni-map

multimap Par exemple, supposons que vous ayez les clés Java, Python et Go. Une valeur est attribuée à chacune de la pluralité de clés. Cette carte avant la conversion est appelée multimap.

Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

uni-map L'application de GroupByKey à la collection multimap clé-valeur ci-dessus donne les résultats suivants.

Java [1, 6, 8]
Python [2, 7]
Go[7, 8]

Après conversion, cette carte est appelée uni-map. Une carte d'une collection de nombres est affectée aux clés uniques Java, Python et Go.

Comment représenter la valeur-clé propre à Beam SDK for Java

Dans Beam SDK pour Java, la valeur-clé est exprimée différemment de Java normal. Représente un objet clé-valeur de type KV <K, V>.

J'ai en fait écrit le code

Fichier à lire

sample.txt


Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

Code Java réel

Chaque processus est décrit comme un commentaire dans le code. Je n'utilise pas autant que possible la chaîne de méthodes pour prioriser la compréhension. Par conséquent, le code est redondant.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


/**
 *Principale
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     *Objet de fonction
     *Donnée String str,Numéro de chaîne","Diviser par
     *Changer num en type entier et KV<String, Integer>Faire un moule
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext est un objet qui représente l'entrée
        //Beam SDK le récupérera pour vous sans avoir à le définir vous-même
        public void processElement(ProcessContext c) {
            // ","Split avec
            String[] words = c.element().split(",");
            //Mot divisé[0]À K, mots[1]Vers un entier vers V
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     *Objet de fonction
     * KV<String, Iterable<Integer>Changer le type en type String
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Convertir l'entrée en type String
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Chemin des données d'entrée
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Chemin de données de sortie
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     *Principale
     *Par souci de compréhension, je n'utilise pas autant que possible la chaîne de méthodes
     *Par conséquent, le code est redondant.
     *
     * @argument param args
     */
    public static void main(String[] args) {
        //Commencez par créer une option à définir dans Pipeline
        //Cette fois, nous allons le démarrer localement, alors spécifiez DirectRunner.
        //En mode local, DirectRunner est déjà la valeur par défaut, vous n'avez donc pas besoin de configurer un coureur
        PipelineOptions options = PipelineOptionsFactory.create();

        //Générer un pipeline en fonction de l'option
        Pipeline pipeline = Pipeline.create(options);

        //Lire les données d'entrée et PCollection à partir de là(Un ensemble de données dans le pipeline)Créer
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        //Donnée String str,Numéro de chaîne","Diviser avec, changer num en type entier, KV<String, Integer>Faire un moule
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        //Avec GroupByKey,{Go, [2, 9, 1, 5]}Forme comme
               // GroupByKey.<K, V>create())Avec GroupByKey<K, V>Génère
        PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
                GroupByKey.<String, Integer>create());

        //Pour la sortie<KV<String, Iterable<Integer>>>Conversion de type en type chaîne
        PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        //Écrire
        output.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run :Exécuter avec Runner spécifié par l'option PipeLine
        // waitUntilFinish :Attendez que PipeLine se termine et retourne l'état final
        pipeline.run().waitUntilFinish();
    }
}

Au fait, en utilisant la chaîne de méthodes, cela ressemble à ceci. C'était plutôt chouette.


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;


/**
 *Principale
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     *Objet de fonction
     *Donnée String str,Numéro de chaîne","Diviser par
     *Changer num en type entier et KV<String, Integer>Faire un moule
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext est un objet qui représente l'entrée
        //Beam SDK le récupérera pour vous sans avoir à le définir vous-même
        public void processElement(ProcessContext c) {
            // ","Split avec
            String[] words = c.element().split(",");
            //Mot divisé[0]À K, mots[1]Vers un entier vers V
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     *Objet de fonction
     * KV<String, Iterable<Integer>Changer le type en type String
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Convertir l'entrée en type String
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Chemin des données d'entrée
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Chemin de données de sortie
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     *Principale
     *Par souci de compréhension, je n'utilise pas autant que possible la chaîne de méthodes
     *Par conséquent, le code est redondant.
     *
     * @argument param args
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        //Comment écrire en utilisant la chaîne de méthodes
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(GroupByKey.<String, Integer>create())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run :Exécuter avec Runner spécifié par l'option PipeLine
        // waitUntilFinish :Attendez que PipeLine se termine et retourne l'état final
        pipeline.run().waitUntilFinish();
    }
}

Résultat d'exécution

Les trois fichiers suivants sont générés. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003

Le contenu de chaque fichier est le suivant. Le traitement étant effectué par un traitement parallèle distribué, il existe des fichiers avec un contenu vierge et un ou deux contenus. En outre, quel contenu est sorti vers quel fichier est aléatoire à chaque fois.

result.csv-00000-of-00003 Pas de contenu

result.csv-00001-of-00003

KV{Java, [1, 3, 2]}

result.csv-00002-of-00003

KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}

Article associé

Introduction à Apache Beam avec Cloud Dataflow (sur la série 2.0.0) ~ Partie de base ~ ParDo ~ --Qiita

Apache Beam 2.0.x avec Google Cloud Dataflow --Qiita commençant par IntelliJ et Gradle

Le site que j'ai utilisé comme référence

Beam Programming Guide

Combiner avec GroupByKey|Documentation Cloud Dataflow|  Google Cloud Platform

Recommended Posts

Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Basic Group By Key ~
Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Combine Edition ~
Introduction à Apache Beam avec Cloud Dataflow (sur la série 2.0.0) ~ Partie de base ~ ParDo ~
Apache Beam 2.0.x avec Google Cloud Dataflow commençant par IntelliJ et Gradle
Présentation pratique d'Apache Beam (Dataflow) [Python]