[JAVA] Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Combine Edition ~

Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Combine ~

Cette section décrit l'utilisation de base de Combine, l'une des cinq principales transformations d'Apache Beam. L'histoire de base des autres Core Transforms et Apache Beam 2.0.x en premier lieu est décrite ci-dessous.

Apache Beam 2.0.x avec Google Cloud Dataflow - Qiita commençant par IntelliJ et Gradle

Introduction à Apache Beam avec Cloud Dataflow (sur la série 2.0.0) ~ Partie de base ~ ParDo ~ --Qiita

Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Basic Group By Key ~ --Qiita

Cet article est rédigé en référence aux deux documents officiels suivants.

Beam Programming Guide

Combinez collection et valeur|Documentation Cloud Dataflow| Google Cloud Platform

Deux rôles de Combine

Combine combine ou fusionne chaque élément (chaque donnée) existant dans la PCollection. Je reconnais que c'est comme Réduire dans Map / Shuffle / Réduire.

Il existe deux manières principales de combiner. "Comment combiner des éléments existant dans une PCollection pour générer une valeur" et "Comment combiner chaque élément de la partie Value de PCollection groupé par clé pour générer une valeur" Est. Ci-dessous, je voudrais décrire chaque méthode.

Comment combiner des éléments qui existent dans une PCollection pour générer une valeur

Comment combiner des éléments existant dans une PCollection pour générer une valeur

Combinez chaque élément de la PCollection. => Il convient de noter que c'est différent de ParDo. ParDo effectue un certain traitement pour chaque élément de la PCollection. Combine combine chaque élément de la PCollection.

Par exemple, c'est le cas où les éléments existant dans la PCollection sont combinés pour générer une valeur.


PCollection<Integer> sum = pCollection.apply(Sum.integersGlobally());

À première vue, Combine ne semble pas exister, mais Sum.integersGlobally () enveloppe Combine.globally. Le Sum.integersGlobally () réel est ci-dessous.


public static Combine.Globally<Integer, Integer> integersGlobally() {
  return Combine.globally(Sum.ofIntegers());}

référence Référence API (https://beam.apache.org/documentation/sdks/javadoc/2.0.0/index.html?org/apache/beam/sdk/transforms/Combine.html)

withoutDefaults() Si vous voulez retourner vide quand une PCollection vide est donnée comme inout, ajoutez withoutDefaults ().


PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());

Différences de comportement entre la fenêtre globale et la fenêtre non globale

Dans le cas de Global Window, le comportement par défaut est de renvoyer une PCollection contenant un élément.

En revanche, dans le cas d'une fenêtre non globale, l'opération par défaut ci-dessus n'est pas effectuée. Spécifiez l'option lors de l'utilisation de la combinaison. La formule était facile à comprendre, alors je la cite ci-dessous. (Au moment de la rédaction de cet article, le document pour Apache Beam 2.0.x n'avait pas encore cette description. Google Cloud Dataflow 1.9 est cité dans le document officiel)

Spécifiez> .withoutDefaults. Dans ce cas, la fenêtre vide de l'entrée PCollection sera également vide dans Output> Collection.

Spécifiez> .asSingletonView. Dans ce cas, la sortie est immédiatement convertie en PCollectionView. Il s'agit de la valeur par défaut lorsque chaque fenêtre vide est utilisée comme entrée secondaire. En général, cette option ne doit être utilisée que si le résultat de la combinaison du pipeline est utilisé ultérieurement comme entrée secondaire dans le pipeline.

Source du devis: Combinez collection et valeur|Documentation Cloud Dataflow| Google Cloud Platform

J'ai en fait écrit le code

Chaque processus est décrit comme un commentaire dans le code. Je n'utilise pas autant que possible la chaîne de méthodes pour prioriser la compréhension. Par conséquent, le code est redondant.

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;

/**
 *Classe principale
 */
public class Main {
    /**
     *Objet fonctionnel
     * String =>Effectuer une conversion de type entier
     */
    static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Chaîne l'élément=>Convertir en entier et sortie
            c.output(Integer.parseInt(c.element()));
        }
    }

    /**
     *Objet fonctionnel
     * Integer =>Effectuer la conversion de type de chaîne
     */
    static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Chaîne l'élément=>Convertir en entier et sortie
            System.out.println(c.element());
            c.output(String.valueOf(c.element()));
        }
    }


    /**
     *Chemin des données d'entrée
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Chemin de données de sortie
     */
    private static final String OUTPUT_FILE_PATH = "./result.txt";

    /**
     *N'utilisez pas autant que possible les chaînes de méthodes pour comprendre
     *Par conséquent, il existe des pièces redondantes
     *Méthode principale
     *
     * @param args
     */
    public static void main(String[] args) {
        //Générer un pipeline en spécifiant une option
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        System.out.println("a");
        //Lire à partir d'un fichier
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
        //Chaîne chaque donnée lue=>Convertir en entier
        PCollection<Integer> integerPCollection = lines.apply(ParDo.of(new TransformTypeFromStringToInteger()));
        // Combine.Résumez chaque élément de PCollection dans le monde
        //Pour une PCollection vide, si vous voulez retourner vide=> PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
        PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
        // PCollection<Integer>somme Entier=>Convertir en chaîne
        PCollection<String> sumString = sum.apply(ParDo.of(new TransformTypeFromIntegerToString()));
        //Écrire dans un fichier
        sumString.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        //Courir
        pipeline.run().waitUntilFinish();
    }
}

J'ai écrit le code pour l'implémentation (ver en utilisant la chaîne de méthodes)

C'était assez propre

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;


/**
 *Classe principale
 */
public class Main {
    /**
     *Objet fonctionnel
     * String =>Effectuer une conversion de type entier
     */
    static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Chaîne l'élément=>Convertir en entier et sortie
            c.output(Integer.parseInt(c.element()));
        }
    }

    /**
     *Objet fonctionnel
     * Integer =>Effectuer la conversion de type de chaîne
     */
    static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Chaîne l'élément=>Convertir en entier et sortie
            System.out.println(c.element());
            c.output(String.valueOf(c.element()));
        }
    }


    /**
     *Chemin des données d'entrée
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Chemin de données de sortie
     */
    private static final String OUTPUT_FILE_PATH = "./result.txt";

    /**
     *Méthode principale
     *
     * @param args
     */
    public static void main(String[] args) {
        //Génération de pipeline
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        //Partie de traitement
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new TransformTypeFromStringToInteger()))
                .apply(Sum.integersGlobally().withoutDefaults())
                .apply(ParDo.of(new TransformTypeFromIntegerToString()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        //Courir
        pipeline.run().waitUntilFinish();
    }
}

Fichier chargé

samole.txt


1
2
3
4
5
6
7
8
9
10

Résultat d'exécution

result.txt-00000-of-00001 est sorti Le contenu de result.txt-00000-of-00001

55

Que fais tu

10
Σk
k = 1

C'est comme ça.

PerKey Lorsque GroupByKey est exécuté, il devient K, V (Iterable Collection). Par exemple:

Java [1, 2, 3]

La PerKey de Combine combine la partie V [Collection Iterable] de cette K, V [Collection Iterable] pour chaque Clé. Ainsi, par exemple, si vous combinez K et V (Iterable Collection) après GroupByKey ci-dessus, ce sera comme suit.

Java [6]

Tous les éléments de V (Collection Iterable) de K et V (Collection Iterable) sont combinés.

J'ai en fait écrit le code

Chaque processus est décrit comme un commentaire dans le code. Je n'utilise pas autant que possible la chaîne de méthodes pour prioriser la compréhension. Par conséquent, le code est redondant.

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
 *Principale
 */
public class Main {
    /**
     *Objet de fonction
     *Donnée String str,Numéro de chaîne","Diviser par
     *Changer num en type entier et KV<String, Integer>Faire un moule
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext est un objet qui représente l'entrée
        //Beam SDK le récupérera pour vous sans avoir à le définir vous-même
        public void processElement(ProcessContext c) {
            // ","Split avec
            String[] words = c.element().split(",");
            //Mot divisé[0]À K, mots[1]Vers un entier vers V
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }


    /**
     *Objet de fonction
     * KV<String, Iterable<Integer>Changer le type en type String
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Convertir l'entrée en type String
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Chemin des données d'entrée
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Chemin de données de sortie
     */
    private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";

    /**
     *Principale
     *Par souci de compréhension, je n'utilise pas autant que possible la chaîne de méthodes
     *Par conséquent, le code est redondant.
     *
     * @argument param args
     */
    public static void main(String[] args) {
        //Commencez par créer une option à définir dans Pipeline
        //Cette fois, puisqu'il sera démarré localement, spécifiez DirectRunner.
        //En mode local, DirectRunner est déjà la valeur par défaut, vous n'avez donc pas besoin de configurer un coureur
        PipelineOptions options = PipelineOptionsFactory.create();

        //Générer un pipeline en fonction de l'option
        Pipeline pipeline = Pipeline.create(options);

        //Lire les données d'entrée et PCollection à partir de là(Un ensemble de données dans le pipeline)Créer
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        //Donnée String str,Numéro de chaîne","Diviser avec, changer num en type entier, KV<String, Integer>Faire un moule
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        //Combine PerKey effectue une conversion GroupByKey dans le cadre de l'opération
        PCollection<KV<String, Integer>> sumPerKey = kvCounter
                .apply(Sum.integersPerKey());
        
        //Convertir PCollection en un formulaire pouvant être généré dans un fichier
        PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        //Écrire
        output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));

        // run :Exécuter avec Runner spécifié par l'option PipeLine
        // waitUntilFinish :Attendez que PipeLine se termine et retourne l'état final
        pipeline.run().waitUntilFinish();
    }


}

J'ai écrit le code pour l'implémentation (ver en utilisant la chaîne de méthodes)

C'était assez propre

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;

/**
 *Principale
 */
public class Main {
    /**
     *Objet de fonction
     *Donnée String str,Numéro de chaîne","Diviser par
     *Changer num en type entier et KV<String, Integer>Faire un moule
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        //ProcessContext est un objet qui représente l'entrée
        //Beam SDK le récupérera pour vous sans avoir à le définir vous-même
        public void processElement(ProcessContext c) {
            // ","Split avec
            String[] words = c.element().split(",");
            //Mot divisé[0]À K, mots[1]Vers un entier vers V
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }


    /**
     *Objet de fonction
     * KV<String, Iterable<Integer>Changer le type en type String
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            //Convertir l'entrée en type String
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     *Chemin des données d'entrée
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     *Chemin de données de sortie
     */
    private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";

    /**
     *Principale
     * @argument param args
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
        pipeline
                .apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(Sum.integersPerKey())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
        pipeline.run().waitUntilFinish();
    }


}

Fichier chargé

samole.txt


Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

Résultat d'exécution

Les trois fichiers suivants sont générés. result.csv-00000-of-00003 result.csv-00001-of-00003 result.csv-00002-of-00003

Le contenu de chaque fichier est le suivant. Puisque le traitement est effectué par un traitement parallèle distribué, quel contenu est émis vers quel fichier est aléatoire à chaque fois.

result.csv-00000-of-00003

KV{Python, 13}

result.csv-00001-of-00003

KV{Java, 6}

result.csv-00002-of-00003

KV{Go, 17}

Article associé

Apache Beam 2.0.x avec Google Cloud Dataflow - Qiita commençant par IntelliJ et Gradle

Introduction à Apache Beam avec Cloud Dataflow (sur la série 2.0.0) ~ Partie de base ~ ParDo ~ --Qiita

Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Basic Group By Key ~ --Qiita

Le site que j'ai utilisé comme référence

Beam Programming Guide Combinez collection et valeur|Documentation Cloud Dataflow| Google Cloud Platform Référence API (https://beam.apache.org/documentation/sdks/javadoc/2.0.0/index.html?org/apache/beam/sdk/transforms/Combine.html)

Recommended Posts

Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Combine Edition ~
Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Basic Group By Key ~
Apache Beam 2.0.x avec Google Cloud Dataflow commençant par IntelliJ et Gradle
Présentation pratique d'Apache Beam (Dataflow) [Python]
Matériel à lire lors de la mise en route d'Apache Beam
Importez et supprimez des fichiers dans Google Cloud Storages avec django-storage
Introduction à RDB avec sqlalchemy Ⅰ
Qu'est-ce que Google Cloud Dataflow?
Série: Introduction à cx_Oracle Contents
Modifications pour exécuter "Utilisation de Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." avec le SDK 2.1
Comment se connecter à Cloud Firestore à partir de Google Cloud Functions avec du code Python