[JAVA] Changes to run "Using Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." with SDK 2.1

In order to learn how to process streams using Cloud Dataflow on Google Cloud Platform (GCP), first, "I tried to create a mechanism to detect DoS using [Cloud Dataflow + Cloud Pub / Sub + Fluentd]" (/ tetsuyam / items / 6636c6bbc3ee7fadfbe3) ”was tried to work. However, ** SDK has changed to Apache Beam base **, so it didn't work as it is, so I will summarize the code changes.

Code difference

Rather than the raw diff output, I've rearranged it a bit so that the corresponding diffs are next to each other.

--- ../Dos.java.orig	2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java	2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
 package com.google.cloud.dataflow.examples;

-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;

 import java.util.List;
 import org.joda.time.Duration;
@@ -52,9 +52,9 @@

   static class GetIPFn extends DoFn<String, String> {
     private static final long serialVersionUID = 0;
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+    // private final Aggregator<Long, Long> emptyLines =
+    //     createAggregator("emptyLines", new Sum.SumLongFn());
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Gson gson = new Gson();
       ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@

   public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       for(KV value : c.element()){
         Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@

   public static void main(String[] args) {
     DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
     options.setProject("xxxxxxxxxxxx");//Specify project
     options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//Specify the path where the jar file will be placed
     options.setJobName("doscheck");//Specify the name of this job
-    options.setRunner(DataflowPipelineRunner.class);//I haven't researched Runner properly. .. ..
+    options.setRunner(DataflowRunner.class);//I haven't researched Runner properly. .. ..
     options.setStreaming(true);//I want to do stream processing, so true

     Pipeline p = Pipeline.create(options);//Create a topology called a pipeline
     //Flesh the topology to create a processing flow
-    p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Obtained from Pubsub
+    p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Obtained from Pubsub
      .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//Process 10 seconds while sliding every 5 seconds
      .apply(ParDo.of(new GetIPFn()))//Get IP from log
      .apply(Count.<String>perElement())//Count by IP
      .apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//Extract the top 3
      .apply(ParDo.of(new FormatAsTextFnFromList()))//Convert to text format for publish to Pubsub
-     .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Publish to pubsub
+     .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//Publish to pubsub
     p.run();//Run pipeline
   }

See GCP's Release Notes: Dataflow SDK 2.x for Java for changes. Did.

[^ t_read]: The docs say to instantiate with <T> read (), but it's private and couldn't be called. [readStrings ()](https://github.com/apache/beam/blob/v2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/ sdk / io / gcp / pubsub / PubsubIO.java # L469) etc. are calling this inside.

appendix

Maven operation

I didn't know how to create a project in Maven, so I wrote down the command based on Quickstart. I will do it. I ran it in the terminal of the GCP console.

Project creation


$ mvn archetype:generate \
      -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
      -DarchetypeGroupId=com.google.cloud.dataflow \
      -DarchetypeVersion=2.1.0 \
      -DgroupId=com.google.cloud.dataflow.examples \
      -DartifactId=dos \
      -Dversion="0.1" \
      -DinteractiveMode=false \
      -Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java

Compile and run


$ mvn compile exec:java \
      -Dexec.mainClass=com.google.cloud.dataflow.examples.Dos

Recommended Posts

Changes to run "Using Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." with SDK 2.1
Run XGBoost with Cloud Dataflow (Python)
GCP: Repeat from Pub / Sub to Cloud Functions, Cloud Functions to Pub / Sub
Run a machine learning pipeline with Cloud Dataflow (Python)
Cloud Functions to resize images using OpenCV with Cloud Storage triggers
How to upload files to Cloud Storage with Firebase's python SDK
Get Python scripts to run quickly in Cloud Run using responder
To run gym_torcs with ubutnu16
Upload files to Aspera that comes with IBM Cloud Object Storage (ICOS) using SDK (Python version)
Introduction to Apache Beam with Cloud Dataflow (over 2.0.0 series) ~ Basic part ~ ParDo ~
Introduction to Apache Beam with Google Cloud Dataflow (over 2.0.x series) ~ Combine ~