Ein Einführungsartikel nach dem Titel. Schreiben Sie Code in Java anstelle von Python, verarbeiten Sie ihn mit Streaming anstelle von Badges und verwenden Sie Dataflow als Runner anstelle von lokal.
In diesem Artikel erstellen wir einen Datenfluss, der Daten von einem PubSub-Thema empfängt, eine einfache Verarbeitung durchführt und die Daten dann in ein anderes PubSub-Thema schreibt.
Erstellen Sie die erforderlichen GCP-Ressourcen.
Maven wird hier verwendet.
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
Fügen Sie "Hallo" zu den von PubSub empfangenen Daten hinzu und fügen Sie sie in einen anderen PubSub ein.
AddHello.java
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class AddHello {
private static final String PROJECT = "[YOUR PROJECT]";
private static final String STAGING_LOCATION = "gs://[YOUR GCS BACKET]/staging";
private static final String TEMP_LOCATION = "gs://[YOUR GCS BACKET]/temp";
private static final String SRC_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 1]";
private static final String DST_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 2]";
static class MyFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output("Hello," + c.element());
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject(PROJECT);
dataflowOptions.setStagingLocation(STAGING_LOCATION);
dataflowOptions.setTempLocation(TEMP_LOCATION);
dataflowOptions.setNumWorkers(1);
Pipeline p = Pipeline.create(dataflowOptions);
p.apply(PubsubIO.readStrings().fromTopic(SRC_PUBSUB_TOPIC))
.apply(ParDo.of(new MyFn()))
.apply(PubsubIO.writeStrings().to(DST_PUBSUB_TOPIC));
p.run();
}
}
Setzen Sie die Umgebungsvariable auf "GOOGLE_APPLICATION_CREDENTIALS = / path / to / xxxxx.json" und führen Sie den obigen Code aus.
Wählen Sie in der GCP-Webkonsole Datenfluss aus und überprüfen Sie, ob er bereitgestellt wurde.
Fügen Sie die Daten in das PubSub-Thema ein, das die Datenquelle ist. Dies ist über die GCP-Webkonsole möglich. Der Datenfluss wird möglicherweise nicht unmittelbar nach der Bereitstellung geladen. Daher empfiehlt es sich, einige Zeit in Anspruch zu nehmen.
Erstellen Sie im PubSub-Thema des Datenausgabeziels ein Abonnement (vorläufig als my-Abonnement bezeichnet) und erfassen Sie die Daten.
$ gcloud pubsub subscriptions pull my-subscription --auto-ack
Recommended Posts