Exécutez Dataflow, Java, Streaming pour le moment

Aperçu

Un article d'introduction selon le titre. Écrivez du code en Java au lieu de Python, traitez avec le streaming au lieu de badges et utilisez Dataflow comme un runner au lieu de local.

Dans cet article, nous allons créer un flux de données qui reçoit les données d'une rubrique PubSub, effectue un traitement simple, puis écrit les données dans une autre rubrique PubSub.

Environnement utilisé pour la vérification

supposition

procédure

Créer des ressources GCP

Créez les ressources GCP requises.

Paramètres de dépendance (à l'aide de Maven)

Maven est utilisé ici.

        <dependency>
            <groupId>com.google.cloud.dataflow</groupId>
            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
            <version>2.5.0</version>
        </dependency>

Code source

Ajoutez «Hello» aux données reçues de PubSub et placez-le dans un autre PubSub.

AddHello.java


import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class AddHello {
    private static final String PROJECT = "[YOUR PROJECT]";
    private static final String STAGING_LOCATION = "gs://[YOUR GCS BACKET]/staging";
    private static final String TEMP_LOCATION = "gs://[YOUR GCS BACKET]/temp";
    private static final String SRC_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 1]";
    private static final String DST_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 2]";

    static class MyFn extends DoFn<String, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output("Hello," + c.element());
        }
    }

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        dataflowOptions.setRunner(DataflowRunner.class);
        dataflowOptions.setProject(PROJECT);
        dataflowOptions.setStagingLocation(STAGING_LOCATION);
        dataflowOptions.setTempLocation(TEMP_LOCATION);
        dataflowOptions.setNumWorkers(1);

        Pipeline p = Pipeline.create(dataflowOptions);
        p.apply(PubsubIO.readStrings().fromTopic(SRC_PUBSUB_TOPIC))
                .apply(ParDo.of(new MyFn()))
                .apply(PubsubIO.writeStrings().to(DST_PUBSUB_TOPIC));
        p.run();
    }
}

Déployer

Définissez la variable d'environnement sur GOOGLE_APPLICATION_CREDENTIALS = / chemin / vers / xxxxx.json et exécutez le code ci-dessus.

Sélectionnez Dataflow dans la console Web GCP et vérifiez qu'il a été déployé.

Contrôle de fonctionnement

Placez les données dans la rubrique PubSub qui est la source de données. Cela est possible depuis la console Web GCP. Dataflow peut ne pas être chargé immédiatement après le déploiement, il peut donc être judicieux de prendre un certain temps.

Créez un abonnement (appelé provisoirement mon abonnement) dans la rubrique PubSub de la destination de sortie des données et acquérez les données.

$ gcloud pubsub subscriptions pull my-subscription --auto-ack

Matériel de référence

Recommended Posts

Exécutez Dataflow, Java, Streaming pour le moment
Utilisez une bibliothèque Java externe pour le moment
Introduction à Java pour la première fois # 2
Apprendre pour la première fois java [Introduction]
Java14 est sorti, alors j'ai essayé d'enregistrer pour le moment
Java12 est sorti, alors j'ai essayé l'expression switch pour le moment
[First Java] Créez quelque chose qui fonctionne avec Intellij pour le moment
Installez Amazon Corretto (préversion) pour le moment
Je veux que vous utilisiez Scala comme meilleur Java pour le moment
[Deep Learning from scratch] en Java 1. Pour le moment, différenciation et différenciation partielle
Essayez d'exécuter Spring Cloud Config pour le moment
Mémo d'apprentissage lors de l'apprentissage de Java pour la première fois (mémo d'apprentissage personnel)
Commande pour essayer d'utiliser Docker pour le moment
Hello World avec la bibliothèque d'extension Ruby pour le moment
Accédez à l'API Web avec Get sur Android et traitez Json (Java pour le moment)
[Pour les débutants] Exécutez Selenium sur Java
Spring AOP pour la première fois
[Memo] Exécutez Node.js v4.4.5 sur CentOS 4.9 / RHEL4 (i386) pour le moment (gcc-4.8 et glibc2.11 sur LinuxKernel 2.6.9)
Construire un serveur DLNA sur Ubuntu (il suffit de bouger pour le moment)
Liste de réglages de Glassfish que je souhaite conserver pour le moment
Guide de style de programmation Java pour l'ère Java 11
[Java] Comment régler la date sur 00:00:00
[Communication Socket (Java)] Impressions de la mise en œuvre de la communication Socket dans la pratique pour la première fois
Première programmation de ma vie Java 1st Hello World
Utilisez la méthode l pour la notation temporelle
Impressions et doutes sur l'utilisation de Java pour la première fois dans Android Studio
Vérifiez les options définies pour le processus Java en cours d'exécution
ChatWork4j pour l'utilisation de l'API ChatWork en Java
[Java] Réglez l'heure depuis le navigateur avec jsoup
Murs touchés par le premier Rspec
Ressentez le passage du temps même à Java
Quel est le modificateur volatile pour les variables Java?
Développement d'Android Studio pour la première fois (pour les débutants)
[Java] (pour MacOS) Méthode de définition du chemin de classe
J'ai essayé de toucher Docker pour la première fois
Compilez et exécutez Java sur la ligne de commande
La date et l'heure de java8 ont été mises à jour
Pour l'apprentissage JAVA (2018-03-16-01)
IDE 2017 pour Java
Mémo à faire pour le moment lors de la construction de la série CentOS 6 avec VirtualBox
[Rails] N + 1 est le mal! Si cela se produit, résolvez-le pour le moment! !! Est dangereux
Java pour instruction
[Tutoriel] Télécharger Eclipse → Lancer l'application avec Java (Pléiades)
[Pour les débutants] Comprendre rapidement les bases de Java 8 lambda
Remarque sur l'initialisation des champs dans le didacticiel Java
Je n'ai pas pu l'exécuter après la mise à niveau de la version Java
Informations d'identification référencées par le kit AWS SDK for Java par défaut
Ce dont les ingénieurs Java ont besoin pour se préparer à la version Java 11
Comment étudier le kotlin pour la première fois ~ Partie 2 ~
Comment étudier le kotlin pour la première fois ~ Partie 1 ~
Résumé des points que les programmeurs Java trouvent glissants lorsqu'ils lisent la source Kotlin pour la première fois