Un article d'introduction selon le titre. Écrivez du code en Java au lieu de Python, traitez avec le streaming au lieu de badges et utilisez Dataflow comme un runner au lieu de local.
Dans cet article, nous allons créer un flux de données qui reçoit les données d'une rubrique PubSub, effectue un traitement simple, puis écrit les données dans une autre rubrique PubSub.
Créez les ressources GCP requises.
Maven est utilisé ici.
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
Ajoutez «Hello» aux données reçues de PubSub et placez-le dans un autre PubSub.
AddHello.java
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class AddHello {
private static final String PROJECT = "[YOUR PROJECT]";
private static final String STAGING_LOCATION = "gs://[YOUR GCS BACKET]/staging";
private static final String TEMP_LOCATION = "gs://[YOUR GCS BACKET]/temp";
private static final String SRC_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 1]";
private static final String DST_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 2]";
static class MyFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output("Hello," + c.element());
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject(PROJECT);
dataflowOptions.setStagingLocation(STAGING_LOCATION);
dataflowOptions.setTempLocation(TEMP_LOCATION);
dataflowOptions.setNumWorkers(1);
Pipeline p = Pipeline.create(dataflowOptions);
p.apply(PubsubIO.readStrings().fromTopic(SRC_PUBSUB_TOPIC))
.apply(ParDo.of(new MyFn()))
.apply(PubsubIO.writeStrings().to(DST_PUBSUB_TOPIC));
p.run();
}
}
Définissez la variable d'environnement sur GOOGLE_APPLICATION_CREDENTIALS = / chemin / vers / xxxxx.json
et exécutez le code ci-dessus.
Sélectionnez Dataflow dans la console Web GCP et vérifiez qu'il a été déployé.
Placez les données dans la rubrique PubSub qui est la source de données. Cela est possible depuis la console Web GCP. Dataflow peut ne pas être chargé immédiatement après le déploiement, il peut donc être judicieux de prendre un certain temps.
Créez un abonnement (appelé provisoirement mon abonnement) dans la rubrique PubSub de la destination de sortie des données et acquérez les données.
$ gcloud pubsub subscriptions pull my-subscription --auto-ack