Führen Sie jetzt Dataflow, Java, Streaming aus

Überblick

Ein Einführungsartikel nach dem Titel. Schreiben Sie Code in Java anstelle von Python, verarbeiten Sie ihn mit Streaming anstelle von Badges und verwenden Sie Dataflow als Runner anstelle von lokal.

In diesem Artikel erstellen wir einen Datenfluss, der Daten von einem PubSub-Thema empfängt, eine einfache Verarbeitung durchführt und die Daten dann in ein anderes PubSub-Thema schreibt.

Umgebung zur Überprüfung verwendet

Annahme

Verfahren

GCP-Ressourcen erstellen

Erstellen Sie die erforderlichen GCP-Ressourcen.

Abhängigkeitseinstellungen (mit Maven)

Maven wird hier verwendet.

        <dependency>
            <groupId>com.google.cloud.dataflow</groupId>
            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
            <version>2.5.0</version>
        </dependency>

Quellcode

Fügen Sie "Hallo" zu den von PubSub empfangenen Daten hinzu und fügen Sie sie in einen anderen PubSub ein.

AddHello.java


import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class AddHello {
    private static final String PROJECT = "[YOUR PROJECT]";
    private static final String STAGING_LOCATION = "gs://[YOUR GCS BACKET]/staging";
    private static final String TEMP_LOCATION = "gs://[YOUR GCS BACKET]/temp";
    private static final String SRC_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 1]";
    private static final String DST_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 2]";

    static class MyFn extends DoFn<String, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output("Hello," + c.element());
        }
    }

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        dataflowOptions.setRunner(DataflowRunner.class);
        dataflowOptions.setProject(PROJECT);
        dataflowOptions.setStagingLocation(STAGING_LOCATION);
        dataflowOptions.setTempLocation(TEMP_LOCATION);
        dataflowOptions.setNumWorkers(1);

        Pipeline p = Pipeline.create(dataflowOptions);
        p.apply(PubsubIO.readStrings().fromTopic(SRC_PUBSUB_TOPIC))
                .apply(ParDo.of(new MyFn()))
                .apply(PubsubIO.writeStrings().to(DST_PUBSUB_TOPIC));
        p.run();
    }
}

Bereitstellen

Setzen Sie die Umgebungsvariable auf "GOOGLE_APPLICATION_CREDENTIALS = / path / to / xxxxx.json" und führen Sie den obigen Code aus.

Wählen Sie in der GCP-Webkonsole Datenfluss aus und überprüfen Sie, ob er bereitgestellt wurde.

Funktionsprüfung

Fügen Sie die Daten in das PubSub-Thema ein, das die Datenquelle ist. Dies ist über die GCP-Webkonsole möglich. Der Datenfluss wird möglicherweise nicht unmittelbar nach der Bereitstellung geladen. Daher empfiehlt es sich, einige Zeit in Anspruch zu nehmen.

Erstellen Sie im PubSub-Thema des Datenausgabeziels ein Abonnement (vorläufig als my-Abonnement bezeichnet) und erfassen Sie die Daten.

$ gcloud pubsub subscriptions pull my-subscription --auto-ack

Referenzmaterial

Recommended Posts

Führen Sie jetzt Dataflow, Java, Streaming aus
Verwenden Sie vorerst eine externe Java-Bibliothek
Einführung in Java zum ersten Mal # 2
Zum ersten Mal Java lernen [Einführung]
Java14 kam heraus, also habe ich vorerst versucht aufzunehmen
Java12 kam heraus, also habe ich vorerst den Schalterausdruck ausprobiert
[Erstes Java] Machen Sie etwas, das vorerst mit Intellij funktioniert
Installieren Sie vorerst Amazon Corretto (Vorschau)
Ich möchte, dass Sie Scala vorerst als besseres Java verwenden
[Deep Learning von Grund auf neu] in Java 1. Zur Zeit Differenzierung und teilweise Differenzierung
Versuchen Sie vorerst, Spring Cloud Config auszuführen
Lernnotiz beim ersten Lernen von Java (persönliches Lernnotiz)
Befehl, um Docker vorerst zu verwenden
Hallo Welt mit Ruby-Erweiterungsbibliothek vorerst
Greifen Sie mit Get on Android auf die Web-API zu und verarbeiten Sie Json (vorerst Java).
[Für Anfänger] Führen Sie Selenium auf Java aus
Spring AOP zum ersten Mal
[Memo] Führen Sie Node.js v4.4.5 vorerst unter CentOS 4.9 / RHEL4 (i386) aus (gcc-4.8 und glibc2.11 unter LinuxKernel 2.6.9).
Aufbau eines DLNA-Servers unter Ubuntu (vorerst nur verschieben)
Glassfish Tuning List, die ich vorerst behalten möchte
Java Programming Style Guide für die Java 11-Ära
[Java] So stellen Sie die Datums- und Uhrzeit auf 00:00:00 ein
[Socket-Kommunikation (Java)] Eindrücke von der erstmaligen Implementierung der Socket-Kommunikation in der Praxis
Erste Programmierung in meinem Leben Java 1st Hello World
Verwenden Sie die Methode l für die Zeitnotation
Eindrücke und Zweifel an der erstmaligen Verwendung von Java in Android Studio
Überprüfen Sie die für den laufenden Java-Prozess festgelegten Optionen
ChatWork4j für die Verwendung der ChatWork-API in Java
[Java] Stellen Sie die Zeit im Browser mit jsoup ein
Wände von der ersten Rspec getroffen
Spüren Sie den Lauf der Zeit auch in Java
Was ist der flüchtige Modifikator für Java-Variablen?
Android Studio-Entwicklung zum ersten Mal (für Anfänger)
[Java] (für MacOS) Methode zur Einstellung des Klassenpfads
Ich habe zum ersten Mal versucht, Docker zu berühren
Kompilieren Sie Java und führen Sie es in der Befehlszeile aus
Das Datum und die Uhrzeit von java8 wurden aktualisiert
Für JAVA-Lernen (2018-03-16-01)
2017 IDE für Java
Vorläufiges Memo beim Erstellen der CentOS 6-Serie mit VirtualBox
[Schienen] N + 1 ist böse! Wenn es doch passiert, lösen Sie es vorerst! !! Ist gefährlich
Java für Anweisung
[Tutorial] Eclipse herunterladen → Anwendung mit Java ausführen (Plejaden)
[Für Anfänger] Verstehen Sie schnell die Grundlagen von Java 8 Lambda
Ein Hinweis zum Initialisieren von Feldern im Java-Lernprogramm
Ich konnte es nach dem Upgrade der Java-Version nicht ausführen
Anmeldeinformationen, auf die das AWS SDK für Java standardmäßig verweist
Was Java-Ingenieure benötigen, um sich auf die Java 11-Version vorzubereiten
Wie man Kotlin zum ersten Mal studiert ~ Teil 2 ~
Wie man Kotlin zum ersten Mal studiert ~ Teil 1 ~
Zusammenfassung der Punkte, die Java-Programmierer beim ersten Lesen der Kotlin-Quelle als rutschig empfinden