[JAVA] Créez un environnement de traitement de données avec Google Cloud DataFlow + Pub / Sub

Un résumé d'introduction que j'ai examiné rapidement lors de l'utilisation de Cloud DataFlow au travail. Je ne suis pas un expert dans le domaine de l'informatique, j'ai donc commis des erreurs dans la description. Si vous l'avez, vous apprendrez donc si vous pouvez me le dire :)

Qu'est-ce que Cloud Data Flow?

Il s'agit d'un service entièrement géré de moteur de traitement de données par flux / par lots (Apache Beam) fourni par Google Cloud Platform.

Qu'est-ce que c'est? Parce que c'est comme Si vous décomposez grossièrement les éléments

Feeling (*) Depuis le 02/02/2018, seul Java prend en charge les flux

En utilisant Cloud Pub / Sub, qui est le même service GCP, en tant que bus de messages, En combinant avec des magasins de données tels que CloudIoTCore (MQTT Protocol Bridge), GCS et BigQuery, par exemple, la transmission de données à partir de terminaux peut être reçue, convertie et intégrée.

Un modèle d'utilisation approximatif vu à partir des exemples d'images de Google.

diagram-dataflow-2x.png

Pratique Cloud DataFlow

Il est difficile à comprendre à moins que vous ne l'utilisiez réellement, donc cette fois nous allons le construire en utilisant ce flux comme exemple.

Untitled.png

Comme je voulais écrire la coopération autour de PubSub, j'ai défini l'entrée de CloudPubSub, mais bien sûr, il est possible de traiter les données des tables A et B dans BigQuery et de les charger dans la table C.

Comment créer un environnement de développement

Téléchargez le SDK avec les éléments suivants dans pom.xml.

    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>[2.2.0, 2.99)</version>
    </dependency>

Si vous n'aimez pas maven, ou si vous avez quelque chose comme ça, consultez Officiel pour plus d'informations. S'il vous plaît.

Écrire du code pour les paramètres de comportement

Tout d'abord, définissez les options du pipeline à traiter.

        DataflowPipelineOptions options = PipelineOptionsFactory.create()
                .as(DataflowPipelineOptions.class);
        //Spécifiez le nom du projet que vous utilisez
        options.setProject("your gcp porject");
        //Créer et spécifier le bucket GCS utilisé par Dataflow pour la préparation
        options.setStagingLocation("gs://hoge/staging");
        //Créer et spécifier un bucket GCS que Dataflow utilise temporairement
        options.setTempLocation("gs://hoge/tmp");
        //Spécifiez le coureur à exécuter. Spécifiez DataflowRunner lors de l'exécution sur GCP. DirectRunner pour une exécution locale
        options.setRunner(DataflowRunner.class);
        //Activer le streaming
        options.setStreaming(true);
        //Spécifiez le nom au moment de l'opération(Les travaux portant le même nom ne peuvent pas être exécutés en même temps
        options.setJobName("sample");

Il y a aussi une explication dans Official, donc seulement un aperçu.

(*) Dans l'exemple ci-dessus, je voulais séparer le paramètre du code, donc j'utilise ResourceBundle de Java, mais si cela fonctionne, cela fonctionnera avec la valeur directe.

Concepts de base qui composent la programmation sur Dataflow

En gros, ce qui suit pour la programmation des tâches Dataflow Vous vous occuperez du concept.

En gros, appliquez le processus de conversion que vous avez écrit à PipeLine et le traitement nécessaire tel que PipeLine I / O. Vous construirez un emploi.

Ecrire le code de la lecture de l'entrée à la sortie

Trois actions décrites dans la figure ci-dessus

Sera appliqué au pipeline.

        //Pipeline (tâche à traiter))Créer un objet
        Pipeline p = Pipeline.create(options);
        TableSchema schema = SampleSchemaFactory.create();
        //Appliquer le contenu de traitement
        //Lire les données de l'abonnement pubsub
        p.apply(PubsubIO.readStrings().fromSubscription("your pubsub subscription"))
        //Spécifiez les fenêtres toutes les 5 minutes(Non requis)
                .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))))
        //Définir la conversion pour l'entrée de pubsub(La mise en œuvre sera décrite plus tard)
                .apply(ParDo.of(new BigQueryRowConverter()))
        //Configurer pour écrire dans BigQuery
                .apply("WriteToBQ", BigQueryIO.writeTableRows()
                        //Spécifiez le nom de la table de destination d'écriture
                        .to(TableDestination("dataset_name:table_name","description"))
                        //Définissez le schéma sur lequel écrire avec Object et transmettez-le
                        .withSchema(schema)
                        //Créer s'il n'y a pas de table(option)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        //Insérer des données à la fin du tableau (facultatif))
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        //Courir
        p.run();

Dans le cas d'une conversion utilisant ParDo, il hérite de DoFn et implémente la récupération de données ~ conversion en PCollection dans processElement qui est une méthode abstraite.

package com.mycompany.dataflow_sample.converter;

import com.google.api.services.bigquery.model.TableRow;
import com.google.gson.Gson;
import com.mycompany.dataflow_sample.entity.SampleInputJson;
import org.apache.beam.sdk.transforms.DoFn;

public class BigQueryRowConverter extends DoFn<String,TableRow> {

    @ProcessElement
    public void processElement(ProcessContext dofn) throws Exception {
      //Recevoir des entrées
      String json = dofn.element();
      Gson gson = new Gson();
      //Convertir json en objet
      SampleInputJson jsonObj = gson.fromJson(json,SampleInputJson.class);
      //Convertir le contenu de json en tableRow de bigquery
      TableRow output = new TableRow();
      TableRow attributesOutput = new TableRow();
      TableRow attr2Output = new TableRow();
      //Définir les données en sortie
      attributesOutput.set("attr1", jsonObj.attributes.attr1);
      attributesOutput.set("attr2", jsonObj.attributes.attr2);
      attr2Output.set("attr2_prop1",jsonObj.attributes.attr2.prop1);
      attr2Output.set("attr2_prop2",jsonObj.attributes.attr2.prop2);
      
      attributesOutput .set("attr2",attr2Output);
      output.set("attributes", attributesOutput );
      output.set("name", jsonObj.name);
      output.set("ts", jsonObj.timeStamp/1000);
      //Production
      dofn.output(output);
    }
}
package com.mycompany.dataflow_sample.schema;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;

public class SampleSchemaFactory {
    public static TableSchema create() {
        List<TableFieldSchema> fields;
        fields = new ArrayList<> ();
        fields.add(new TableFieldSchema().setName("name").setType("STRING"));
        fields.add(new TableFieldSchema().setName("ts").setType("TIMESTAMP"));
        fields.add(new TableFieldSchema().setName("attributes").setType("RECORD")
                .setFields(new ArrayList<TableFieldSchema>() {
                    {
                        add(new TableFieldSchema().setName("attr1").setType("STRING"));
                        add(new TableFieldSchema().setName("attr2").setType("RECORD")
                            .setFields(new ArrayList<TableFieldSchema>() {
                                {
                                    add(new TableFieldSchema().setName("prop1").setType("INTEGER"));
                                    add(new TableFieldSchema().setName("prop2").setType("STRING"));
                                }
                            })
                        );
                    }
                })
        );
        TableSchema schema = new TableSchema().setFields(fields);

        return schema;
    }
}

Comme ça.

Déployer / tester

Créez et exécutez simplement le code Java que vous avez écrit précédemment et la tâche sera en fait déployée sur GCP.

dataflow.png

Envoyez un message depuis la console GCP.

pubsub.png

La vérification de l'opération est OK si des données sont insérées dans BigQuery :)

Consultez le journal

Vous pouvez consulter le journal à partir de l'écran des détails en cliquant sur le travail créé sur la console. Les journaux sont également automatiquement transférés vers stackdriver, vous devez donc l'utiliser pour la surveillance et la surveillance.

detail.png

Comportement au moment de l'erreur (occurrence d'exception)

Si une exception se produit lors de l'exécution de DataflowJob, ACK ne sera pas envoyé à PubSub, les données seront donc à nouveau récupérées.

Donc

échantillon

L'exemple est ici. Je voulais le partitionner, donc je l'ai fait pour créer une table quotidienne basée sur le quotidien au moment de l'exécution.

référence

Collection d'exemples de code officielle (github) apache beam sdk officiel google

Recommended Posts

Créez un environnement de traitement de données avec Google Cloud DataFlow + Pub / Sub
Créer un environnement Node.js avec Docker
Créer un environnement Tomcat 8.5 avec Pleiades 4.8
Créer un environnement de développement PureScript avec Docker
Créer un environnement de développement Wordpress avec Docker
Créer un environnement Laravel / Docker avec VSCode devcontainer
Créez rapidement un environnement de développement WordPress avec Docker
[Win10] Créer un environnement de développement JSF avec NetBeans
Créer un environnement de développement Java avec VS Code
Créer un environnement de développement Ruby on Rails sur AWS Cloud9
[Note] Créez un environnement Python3 avec Docker dans EC2
Créez un référentiel interne Maven sur Google Cloud Storage
Créez un environnement de développement «Spring Thorough Introduction» avec IntelliJ IDEA
Créez un environnement virtuel CentOS 8 sur votre Mac à l'aide de VirtualBox
Créer un environnement Docker avec WSL
Ruby ① Création d'un environnement Windows
Créer un environnement de développement pour Django + MySQL + nginx avec Docker Compose
Étapes pour créer un environnement de développement Ruby on Rails avec Vagrant
Créez un environnement Vue3 avec Docker!
Créer un environnement local Couchbase avec Docker
Construire un projet Java avec Gradle
Créer une application Web avec Javalin
Utiliser Java 11 avec Google Cloud Functions
[Google Cloud] Premiers pas avec Docker
Créer un environnement XAMPP sur Ubuntu
Google Cloud Platform avec Spring Boot 2.0.0
Créer un environnement de développement Jooby avec Eclipse
Modèle Cloud Dataflow créé avec Scala
Créez un environnement Docker + Laravel avec Laradock
Créez un environnement de test d'application Windows avec Selenium Grid, Appium et Windows Application Driver
[Copier et coller] Créez un environnement de développement Laravel avec Docker Compose, partie 2
Comment créer un environnement de développement Ruby on Rails avec Docker (Rails 6.x)
Créez un environnement de développement local pour les didacticiels Rails avec Docker (Rails 6 + PostgreSQL + Webpack)
Créez une API de tableau d'affichage avec certification et autorisation avec Rails 6 # 1 Construction de l'environnement
Créez un environnement de développement sur AWS EC2 avec CentOS7 + Nginx + pm2 + Nuxt.js
Comment créer un environnement de développement Ruby on Rails avec Docker (Rails 5.x)
Modèle: créer un environnement de développement Ruby / Rails avec un conteneur Docker (version Ubuntu)
Modèle: créer un environnement de développement Ruby / Rails avec un conteneur Docker (version Mac)