[JAVA] Erstellen Sie eine Datenverarbeitungsumgebung mit Google Cloud DataFlow + Pub / Sub

Eine einführende Zusammenfassung, die ich mir bei der Verwendung von Cloud DataFlow bei der Arbeit kurz angesehen habe. Ich bin kein Experte auf dem Gebiet der Datenverarbeitung, daher habe ich einige Fehler in der Beschreibung gemacht. Wenn du es hast, wirst du es lernen, wenn du es mir sagen kannst :)

Was ist Cloud-Datenfluss?

Es handelt sich um einen vollständig verwalteten Dienst der Stream- / Batch-Datenverarbeitungs-Engine (Apache Beam), der von Google Cloud Platform bereitgestellt wird.

Was ist das? Weil es sich anfühlt wie Wenn Sie die Elemente grob zerlegen

Gefühl (*) Ab dem 02/02/2018 unterstützt nur Java Streams

Verwenden von Cloud Pub / Sub, dem gleichen GCP-Dienst, als Nachrichtenbus Durch die Kombination mit Datenspeichern wie CloudIoTCore (MQTT Protocol Bridge), GCS und BigQuery kann beispielsweise die Datenübertragung von Terminals empfangen, konvertiert und integriert werden.

Ein grobes Nutzungsmodell aus den Beispielbildern von Google.

diagram-dataflow-2x.png

Cloud DataFlow-Praxis

Es ist schwer zu verstehen, wenn Sie es nicht tatsächlich verwenden. Dieses Mal werden wir es anhand dieses Ablaufs als Beispiel erstellen.

Untitled.png

Da ich die Zusammenarbeit rund um PubSub schreiben wollte, habe ich den Zugang zu CloudPubSub festgelegt, aber natürlich ist es möglich, die Daten von Tabelle A und Tabelle B in BigQuery zu verarbeiten und in Tabelle C zu laden.

So erstellen Sie eine Entwicklungsumgebung

Laden Sie das SDK wie folgt in pom.xml herunter.

    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>[2.2.0, 2.99)</version>
    </dependency>

Wenn Sie einen Maven oder ähnliches haben, überprüfen Sie Offiziell auf weitere Informationen. Bitte.

Schreiben Sie Code für Verhaltenseinstellungen

Legen Sie zunächst die Optionen der zu verarbeitenden Pipeline fest.

        DataflowPipelineOptions options = PipelineOptionsFactory.create()
                .as(DataflowPipelineOptions.class);
        //Geben Sie den verwendeten Projektnamen an
        options.setProject("your gcp porject");
        //Erstellen und spezifizieren Sie den GCS-Bucket, der von Dataflow für das Staging verwendet wird
        options.setStagingLocation("gs://hoge/staging");
        //Erstellen und geben Sie einen GCS-Bucket an, den Dataflow vorübergehend verwendet
        options.setTempLocation("gs://hoge/tmp");
        //Geben Sie den auszuführenden Läufer an. Geben Sie DataflowRunner an, wenn Sie GCP ausführen. DirectRunner für die lokale Ausführung
        options.setRunner(DataflowRunner.class);
        //Streaming aktivieren
        options.setStreaming(true);
        //Geben Sie den Namen zum Zeitpunkt des Betriebs an(Jobs mit demselben Namen können nicht gleichzeitig ausgeführt werden
        options.setJobName("sample");

Es gibt auch eine Erklärung in Official, also nur eine grobe Übersicht.

(*) Im obigen Beispiel wollte ich die Einstellung vom Code trennen, daher verwende ich Javas ResourceBundle, aber wenn es nur funktioniert, funktioniert es mit dem direkten Wert.

Grundlegende Konzepte für die Programmierung in Dataflow

Grundsätzlich das Folgende zum Programmieren von Datenflussjobs Sie werden sich mit dem Konzept beschäftigen.

Wenden Sie grundsätzlich den Konvertierungsprozess an, den Sie in PipeLine geschrieben haben, und die erforderliche Verarbeitung wie PipeLine I / O. Sie werden einen Job aufbauen.

Schreiben Sie den Code vom Lesen der Eingabe zur Ausgabe

Drei in der obigen Abbildung beschriebene Aktionen

Wird auf die Pipeline angewendet.

        //Pipeline (zu verarbeitender Job))Objekt erstellen
        Pipeline p = Pipeline.create(options);
        TableSchema schema = SampleSchemaFactory.create();
        //Wenden Sie den Verarbeitungsinhalt an
        //Lesen Sie die Daten aus dem Pubsub-Abonnement
        p.apply(PubsubIO.readStrings().fromSubscription("your pubsub subscription"))
        //Geben Sie alle 5 Minuten Fenster an(Nicht benötigt)
                .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))))
        //Konvertierung für Eingabe von pubsub einstellen(Die Implementierung wird später beschrieben)
                .apply(ParDo.of(new BigQueryRowConverter()))
        //Stellen Sie ein, um in BigQuery zu schreiben
                .apply("WriteToBQ", BigQueryIO.writeTableRows()
                        //Geben Sie den Namen der Schreibzieltabelle an
                        .to(TableDestination("dataset_name:table_name","description"))
                        //Definieren Sie das Schema, in das mit Object geschrieben werden soll, und übergeben Sie es
                        .withSchema(schema)
                        //Erstellen, wenn keine Tabelle vorhanden ist(Möglichkeit)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        //Daten am Ende der Tabelle einfügen (optional))
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        //Lauf
        p.run();

Bei der Konvertierung mit ParDo erbt es DoFn und implementiert die Datenabrufkonvertierung in PCollection in processElement, einer abstrakten Methode.

package com.mycompany.dataflow_sample.converter;

import com.google.api.services.bigquery.model.TableRow;
import com.google.gson.Gson;
import com.mycompany.dataflow_sample.entity.SampleInputJson;
import org.apache.beam.sdk.transforms.DoFn;

public class BigQueryRowConverter extends DoFn<String,TableRow> {

    @ProcessElement
    public void processElement(ProcessContext dofn) throws Exception {
      //Eingaben empfangen
      String json = dofn.element();
      Gson gson = new Gson();
      //Konvertieren Sie json in ein Objekt
      SampleInputJson jsonObj = gson.fromJson(json,SampleInputJson.class);
      //Konvertieren Sie den Inhalt von json in tableRow of bigquery
      TableRow output = new TableRow();
      TableRow attributesOutput = new TableRow();
      TableRow attr2Output = new TableRow();
      //Stellen Sie die Daten in der Ausgabe ein
      attributesOutput.set("attr1", jsonObj.attributes.attr1);
      attributesOutput.set("attr2", jsonObj.attributes.attr2);
      attr2Output.set("attr2_prop1",jsonObj.attributes.attr2.prop1);
      attr2Output.set("attr2_prop2",jsonObj.attributes.attr2.prop2);
      
      attributesOutput .set("attr2",attr2Output);
      output.set("attributes", attributesOutput );
      output.set("name", jsonObj.name);
      output.set("ts", jsonObj.timeStamp/1000);
      //Ausgabe
      dofn.output(output);
    }
}
package com.mycompany.dataflow_sample.schema;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;

public class SampleSchemaFactory {
    public static TableSchema create() {
        List<TableFieldSchema> fields;
        fields = new ArrayList<> ();
        fields.add(new TableFieldSchema().setName("name").setType("STRING"));
        fields.add(new TableFieldSchema().setName("ts").setType("TIMESTAMP"));
        fields.add(new TableFieldSchema().setName("attributes").setType("RECORD")
                .setFields(new ArrayList<TableFieldSchema>() {
                    {
                        add(new TableFieldSchema().setName("attr1").setType("STRING"));
                        add(new TableFieldSchema().setName("attr2").setType("RECORD")
                            .setFields(new ArrayList<TableFieldSchema>() {
                                {
                                    add(new TableFieldSchema().setName("prop1").setType("INTEGER"));
                                    add(new TableFieldSchema().setName("prop2").setType("STRING"));
                                }
                            })
                        );
                    }
                })
        );
        TableSchema schema = new TableSchema().setFields(fields);

        return schema;
    }
}

So was.

Bereitstellen / Testen

Erstellen Sie einfach den Java-Code, den Sie zuvor geschrieben haben, und führen Sie ihn aus. Der Job wird dann tatsächlich auf GCP bereitgestellt.

dataflow.png

Senden Sie eine Nachricht von der GCP-Konsole.

pubsub.png

Die Funktionsprüfung ist in Ordnung, wenn Daten in BigQuery eingefügt werden :)

Überprüfen Sie das Protokoll

Sie können das Protokoll im Detailbildschirm überprüfen, indem Sie auf den auf der Konsole erstellten Job klicken. Protokolle werden automatisch an Stackdriver übertragen. Daher empfiehlt es sich, diese für die Überwachung und Überwachung zu verwenden.

detail.png

Verhalten zum Zeitpunkt des Fehlers (Auftreten einer Ausnahme)

Wenn beim Ausführen eines DataflowJob eine Ausnahme auftritt, wird ACK nicht an PubSub gesendet, sodass die Daten erneut abgerufen werden.

Deshalb

--PubSub-Nachrichten werden auch dann nicht verworfen, wenn die Verarbeitung im Ladeteil von BigQuery verloren geht. Sie werden daher automatisch wiederholt (es ist nicht erforderlich, einen erneuten Versuch auf Kommunikationsebene in Betracht zu ziehen).

Stichprobe

Das Beispiel ist hier. Ich wollte es partitionieren, also habe ich es geschafft, eine tägliche Tabelle basierend auf der täglichen Tabelle zum Zeitpunkt der Ausführung zu erstellen.

Referenz

Offizielle Beispielcode-Sammlung (Github) apache beam sdk Google-Beamter

Recommended Posts

Erstellen Sie eine Datenverarbeitungsumgebung mit Google Cloud DataFlow + Pub / Sub
Erstellen Sie mit Docker eine Node.js-Umgebung
Erstellen Sie mit Pleiades 4.8 eine Tomcat 8.5-Umgebung
Erstellen Sie mit Docker eine PureScript-Entwicklungsumgebung
Erstellen Sie mit Docker eine Wordpress-Entwicklungsumgebung
Erstellen Sie eine Laravel / Docker-Umgebung mit VSCode devcontainer
Erstellen Sie mit Docker schnell eine WordPress-Entwicklungsumgebung
[Win10] Erstellen Sie eine JSF-Entwicklungsumgebung mit NetBeans
Erstellen Sie eine Java-Entwicklungsumgebung mit VS Code
Erstellen Sie eine Ruby on Rails-Entwicklungsumgebung in AWS Cloud9
[Hinweis] Erstellen Sie eine Python3-Umgebung mit Docker in EC2
Erstellen Sie ein internes Maven-Repository in Google Cloud Storage
Erstellen Sie mit IntelliJ IDEA eine Entwicklungsumgebung "Spring Thorough Introduction"
Erstellen Sie mit VirtualBox eine virtuelle CentOS 8-Umgebung auf Ihrem Mac
Erstellen einer Docker-Umgebung mit WSL
Ruby ① Erstellen einer Windows-Umgebung
Erstellen Sie mit Docker Compose eine Entwicklungsumgebung für Django + MySQL + nginx
Schritte zum Erstellen einer Ruby on Rails-Entwicklungsumgebung mit Vagrant
Erstellen Sie mit Docker eine Vue3-Umgebung!
Erstellen Sie mit Docker eine lokale Couchbase-Umgebung
Erstellen eines Java-Projekts mit Gradle
Erstellen Sie eine Webanwendung mit Javalin
Verwenden Sie Java 11 mit Google Cloud-Funktionen
[Google Cloud] Erste Schritte mit Docker
Erstellen Sie eine XAMPP-Umgebung unter Ubuntu
Google Cloud Platform mit Spring Boot 2.0.0
Erstellen Sie mit Eclipse eine Jooby-Entwicklungsumgebung
Mit Scala erstellte Cloud-Datenflussvorlage
Erstellen Sie mit Laradock eine Docker + Laravel-Umgebung
Erstellen Sie eine Windows-Anwendungstestumgebung mit Selenium Grid, Appium und Windows Application Driver
[Kopieren und Einfügen] Erstellen Sie mit Docker Compose Part 2 eine Laravel-Entwicklungsumgebung
So erstellen Sie eine Ruby on Rails-Entwicklungsumgebung mit Docker (Rails 6.x)
Erstellen Sie mit Docker eine lokale Entwicklungsumgebung für Rails-Tutorials (Rails 6 + PostgreSQL + Webpack)
Erstellen Sie eine Bulletin-Board-API mit Zertifizierung und Autorisierung mit Rails 6 # 1 Environment Construction
Erstellen Sie mit CentOS7 + Nginx + pm2 + Nuxt.js eine Entwicklungsumgebung auf AWS EC2
So erstellen Sie eine Ruby on Rails-Entwicklungsumgebung mit Docker (Rails 5.x)
Vorlage: Erstellen Sie eine Ruby / Rails-Entwicklungsumgebung mit einem Docker-Container (Ubuntu-Version).
Vorlage: Erstellen Sie eine Ruby / Rails-Entwicklungsumgebung mit einem Docker-Container (Mac-Version).