Afin d'apprendre à traiter les flux à l'aide de Cloud Dataflow sur Google Cloud Platform (GCP), tout d'abord, "[Cloud Dataflow + Cloud Pub / Sub + Fluentd a été utilisé pour créer un mécanisme de détection DoS](/ tetsuyam / items / 6636c6bbc3ee7fadfbe3) »a été essayé de fonctionner. Cependant, ** le SDK a changé pour Apache Beam base **, donc cela n'a pas fonctionné tel quel, donc je vais résumer les changements de code.
Plutôt que la sortie diff brute, elle est légèrement réorganisée afin que les différences correspondantes soient les unes à côté des autres.
--- ../Dos.java.orig 2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java 2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
package com.google.cloud.dataflow.examples;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;
import java.util.List;
import org.joda.time.Duration;
@@ -52,9 +52,9 @@
static class GetIPFn extends DoFn<String, String> {
private static final long serialVersionUID = 0;
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ // private final Aggregator<Long, Long> emptyLines =
+ // createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new Gson();
ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@
public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for(KV value : c.element()){
Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject("xxxxxxxxxxxx");//Spécifiez le projet
options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//Spécifiez le chemin où le fichier jar sera placé
options.setJobName("doscheck");//Spécifiez le nom de ce travail
- options.setRunner(DataflowPipelineRunner.class);//Je n'ai pas correctement recherché Runner. .. ..
+ options.setRunner(DataflowRunner.class);//Je n'ai pas correctement recherché Runner. .. ..
options.setStreaming(true);//Je veux faire du traitement de flux, c'est vrai
Pipeline p = Pipeline.create(options);//Créer une topologie appelée pipeline
//Compléter la topologie pour créer un flux de traitement
- p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Obtenu sur Pubsub
+ p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Obtenu sur Pubsub
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//Traitez pendant 10 secondes en glissant toutes les 5 secondes
.apply(ParDo.of(new GetIPFn()))//Obtenir l'adresse IP du journal
.apply(Count.<String>perElement())//Compter par IP
.apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//Extraire le top 3
.apply(ParDo.of(new FormatAsTextFnFromList()))//Convertir au format texte pour pouvoir être publié dans Pubsub
- .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Publier dans Pubsub
+ .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//Publier dans Pubsub
p.run();//Exécuter le pipeline
}
Consultez la [Note de publication: Dataflow SDK 2.x pour Java] de GCP (https://cloud.google.com/dataflow/release-notes/release-notes-java-2?hl=ja) pour connaître les modifications. Fait.
com.google.cloud.dataflow
à ʻorg.apache.beam`PubsubIO
ont été déplacés vers des sous-packagesPipeline
du nom de la classeBlockingDataflowPipelineRunner
a été supprimé et intégré dans DataflowRunner
@ Override
à @ ProcessElement
PCollection.apply ()
au lieu d'utiliser named ()
ReadStrings ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO. Instancier avec html # readStrings-) [^ t_read](idem pour l'écriture)topic ()
est [fromTopic ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO .Read.html # fromTopic-java.lang.String-) / [to ()
](https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk Remplacé par /io/gcp/pubsub/PubsubIO.Write.html#to-java.lang.String-)[^ t_read]: La documentation dit d'instancier avec <T> read ()
, mais c'est privé et ne peut pas être appelé. [readStrings ()
](https://github.com/apache/beam/blob/v2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/ sdk / io / gcp / pubsub / PubsubIO.java # L469) etc. appellent cela à l'intérieur.
Je ne savais même pas comment créer un projet dans Maven, alors notez la commande basée sur Quickstart (https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven?hl=ja) Je le ferai. Je l'ai exécuté dans le terminal de la console GCP.
Création de projet
$ mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=2.1.0 \
-DgroupId=com.google.cloud.dataflow.examples \
-DartifactId=dos \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java
Compiler et exécuter
$ mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.dataflow.examples.Dos
Recommended Posts