Run Dataflow, Java, streaming for the time being

Overview

An introductory article according to the title. Write code in Java instead of Python, process with streaming instead of badges, and use Dataflow as a runner instead of local.

In this article, we'll create a dataflow that receives data from a PubSub topic, does some simple processing, and then writes the data to another PubSub topic.

Environment used for verification

Premise

procedure

Creating GCP resources

Create the required GCP resources.

Dependency settings (using Maven)

Maven is used here.

        <dependency>
            <groupId>com.google.cloud.dataflow</groupId>
            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
            <version>2.5.0</version>
        </dependency>

Source code

Add "Hello," to the data received from PubSub and put it in another PubSub.

AddHello.java


import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class AddHello {
    private static final String PROJECT = "[YOUR PROJECT]";
    private static final String STAGING_LOCATION = "gs://[YOUR GCS BACKET]/staging";
    private static final String TEMP_LOCATION = "gs://[YOUR GCS BACKET]/temp";
    private static final String SRC_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 1]";
    private static final String DST_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 2]";

    static class MyFn extends DoFn<String, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output("Hello," + c.element());
        }
    }

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        dataflowOptions.setRunner(DataflowRunner.class);
        dataflowOptions.setProject(PROJECT);
        dataflowOptions.setStagingLocation(STAGING_LOCATION);
        dataflowOptions.setTempLocation(TEMP_LOCATION);
        dataflowOptions.setNumWorkers(1);

        Pipeline p = Pipeline.create(dataflowOptions);
        p.apply(PubsubIO.readStrings().fromTopic(SRC_PUBSUB_TOPIC))
                .apply(ParDo.of(new MyFn()))
                .apply(PubsubIO.writeStrings().to(DST_PUBSUB_TOPIC));
        p.run();
    }
}

Deploy

Set the environment variable to GOOGLE_APPLICATION_CREDENTIALS = / path / to / xxxxx.json and execute the above code.

Select Dataflow from the GCP web console and verify that it has been deployed.

Operation check

Put data in the PubSub topic that will be the data source. This is possible from the GCP web console. Dataflow may not be loaded immediately after deployment, so it may be a good idea to take some time.

Create a subscription (tentatively called my-subscription) in the PubSub topic of the data output destination and acquire the data.

$ gcloud pubsub subscriptions pull my-subscription --auto-ack

Reference material

Recommended Posts

Run Dataflow, Java, streaming for the time being
Use Java external library for the time being
For the time being, run the war file delivered in Java with Docker
Introduction to java for the first time # 2
Learning for the first time java [Introduction]
Java14 came out, so I tried record for the time being
Java12 came out, so I tried the switch expression for the time being
[First Java] Make something that works with Intellij for the time being
Install Amazon Corretto (preview) for the time being
I want you to use Scala as Better Java for the time being
[Deep Learning from scratch] in Java 1. For the time being, differentiation and partial differentiation
Try running Spring Cloud Config for the time being
Learning memo when learning Java for the first time (personal learning memo)
Command to try using Docker for the time being
Hello World with Ruby extension library for the time being
Access Web API on Android with Get and process Json (Java for the time being)
[For beginners] Run Selenium in Java
Spring AOP for the first time
[Memo] Run Node.js v4.4.5 on CentOS 4.9 / RHEL4 (i386) for the time being (gcc-4.8 and glibc2.11 on LinuxKernel 2.6.9)
Building a DLNA server on Ubuntu (just move for the time being)
Glassfish tuning list that I want to keep for the time being
Java Programming Style Guide for the Java 11 Era
[Java] How to set the Date time to 00:00:00
[Socket communication (Java)] Impressions of implementing Socket communication in practice for the first time
Programming for the first time in my life Java 1st Hello World
Use the l method for time notation
Impressions and doubts about using java for the first time in Android Studio
The training for newcomers was "Make an app!", So I made an app for the time being.
I tried using Docker for the first time
Check the options set for the running Java process
ChatWork4j for using the ChatWork API in Java
[Java] Set the time from the browser with jsoup
Walls hit by Rspec for the first time
Feel the passage of time even in Java
What is the volatile modifier for Java variables?
Android Studio development for the first time (for beginners)
[Java] (for MacOS) How to set the classpath
I tried touching Docker for the first time
Compile and run Java on the command line
The date time of java8 has been updated
For JAVA learning (2018-03-16-01)
2017 IDE for Java
A memo to do for the time being when building CentOS 6 series with VirtualBox
[Rails] N + 1 is evil! If it does occur, resolve it for the time being! !! Is dangerous
Java for statement
[Tutorial] Download Eclipse → Run the application with Java (Pleiades)
[For beginners] Quickly understand the basics of Java 8 Lambda
A note for Initializing Fields in the Java tutorial
I couldn't run it after upgrading the Java version
Oreore certificate https (2020/12/19) for the first time with nginx
Credentials referenced by the AWS SDK for Java by default
What Java engineers need to prepare for the Java 11 release
How to study kotlin for the first time ~ Part 2 ~
How to study kotlin for the first time ~ Part 1 ~
A summary of what Java programmers find when reading Kotlin source for the first time