In order to learn how to process streams using Cloud Dataflow on Google Cloud Platform (GCP), first, "I tried to create a mechanism to detect DoS using [Cloud Dataflow + Cloud Pub / Sub + Fluentd]" (/ tetsuyam / items / 6636c6bbc3ee7fadfbe3) ”was tried to work. However, ** SDK has changed to Apache Beam base **, so it didn't work as it is, so I will summarize the code changes.
Rather than the raw diff output, I've rearranged it a bit so that the corresponding diffs are next to each other.
--- ../Dos.java.orig 2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java 2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
package com.google.cloud.dataflow.examples;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;
import java.util.List;
import org.joda.time.Duration;
@@ -52,9 +52,9 @@
static class GetIPFn extends DoFn<String, String> {
private static final long serialVersionUID = 0;
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ // private final Aggregator<Long, Long> emptyLines =
+ // createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new Gson();
ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@
public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for(KV value : c.element()){
Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject("xxxxxxxxxxxx");//Specify project
options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//Specify the path where the jar file will be placed
options.setJobName("doscheck");//Specify the name of this job
- options.setRunner(DataflowPipelineRunner.class);//I haven't researched Runner properly. .. ..
+ options.setRunner(DataflowRunner.class);//I haven't researched Runner properly. .. ..
options.setStreaming(true);//I want to do stream processing, so true
Pipeline p = Pipeline.create(options);//Create a topology called a pipeline
//Flesh the topology to create a processing flow
- p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Obtained from Pubsub
+ p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Obtained from Pubsub
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//Process 10 seconds while sliding every 5 seconds
.apply(ParDo.of(new GetIPFn()))//Get IP from log
.apply(Count.<String>perElement())//Count by IP
.apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//Extract the top 3
.apply(ParDo.of(new FormatAsTextFnFromList()))//Convert to text format for publish to Pubsub
- .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Publish to pubsub
+ .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//Publish to pubsub
p.run();//Run pipeline
}
See GCP's Release Notes: Dataflow SDK 2.x for Java for changes. Did.
com.google.cloud.dataflow
to ʻorg.apache.beam`PubsubIO
moved to subpackagesPipeline
from class nameBlockingDataflowPipelineRunner
has been removed and integrated into DataflowRunner
@Override
to @ProcessElement
PCollection.apply ()
instead of named ()
Read
but [readStrings ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO. Instantiate with html # readStrings-) [^ t_read](same for write)topic ()
is [fromTopic ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO .Read.html # fromTopic-java.lang.String-) / [to ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk Changed to /io/gcp/pubsub/PubsubIO.Write.html#to-java.lang.String-)[^ t_read]: The docs say to instantiate with <T> read ()
, but it's private and couldn't be called. [readStrings ()
](https://github.com/apache/beam/blob/v2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/ sdk / io / gcp / pubsub / PubsubIO.java # L469) etc. are calling this inside.
I didn't know how to create a project in Maven, so I wrote down the command based on Quickstart. I will do it. I ran it in the terminal of the GCP console.
Project creation
$ mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=2.1.0 \
-DgroupId=com.google.cloud.dataflow.examples \
-DartifactId=dos \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java
Compile and run
$ mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.dataflow.examples.Dos
Recommended Posts