[JAVA] Modifications pour exécuter "Utilisation de Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." avec le SDK 2.1

Afin d'apprendre à traiter les flux à l'aide de Cloud Dataflow sur Google Cloud Platform (GCP), tout d'abord, "[Cloud Dataflow + Cloud Pub / Sub + Fluentd a été utilisé pour créer un mécanisme de détection DoS](/ tetsuyam / items / 6636c6bbc3ee7fadfbe3) »a été essayé de fonctionner. Cependant, ** le SDK a changé pour Apache Beam base **, donc cela n'a pas fonctionné tel quel, donc je vais résumer les changements de code.

Différence de code

Plutôt que la sortie diff brute, elle est légèrement réorganisée afin que les différences correspondantes soient les unes à côté des autres.

--- ../Dos.java.orig	2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java	2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
 package com.google.cloud.dataflow.examples;

-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;

 import java.util.List;
 import org.joda.time.Duration;
@@ -52,9 +52,9 @@

   static class GetIPFn extends DoFn<String, String> {
     private static final long serialVersionUID = 0;
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+    // private final Aggregator<Long, Long> emptyLines =
+    //     createAggregator("emptyLines", new Sum.SumLongFn());
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Gson gson = new Gson();
       ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@

   public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       for(KV value : c.element()){
         Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@

   public static void main(String[] args) {
     DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
     options.setProject("xxxxxxxxxxxx");//Spécifiez le projet
     options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//Spécifiez le chemin où le fichier jar sera placé
     options.setJobName("doscheck");//Spécifiez le nom de ce travail
-    options.setRunner(DataflowPipelineRunner.class);//Je n'ai pas correctement recherché Runner. .. ..
+    options.setRunner(DataflowRunner.class);//Je n'ai pas correctement recherché Runner. .. ..
     options.setStreaming(true);//Je veux faire du traitement de flux, c'est vrai

     Pipeline p = Pipeline.create(options);//Créer une topologie appelée pipeline
     //Compléter la topologie pour créer un flux de traitement
-    p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Obtenu sur Pubsub
+    p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Obtenu sur Pubsub
      .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//Traitez pendant 10 secondes en glissant toutes les 5 secondes
      .apply(ParDo.of(new GetIPFn()))//Obtenir l'adresse IP du journal
      .apply(Count.<String>perElement())//Compter par IP
      .apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//Extraire le top 3
      .apply(ParDo.of(new FormatAsTextFnFromList()))//Convertir au format texte pour pouvoir être publié dans Pubsub
-     .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Publier dans Pubsub
+     .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//Publier dans Pubsub
     p.run();//Exécuter le pipeline
   }

Consultez la [Note de publication: Dataflow SDK 2.x pour Java] de GCP (https://cloud.google.com/dataflow/release-notes/release-notes-java-2?hl=ja) pour connaître les modifications. Fait.

[^ t_read]: La documentation dit d'instancier avec <T> read (), mais c'est privé et ne peut pas être appelé. [readStrings ()](https://github.com/apache/beam/blob/v2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/ sdk / io / gcp / pubsub / PubsubIO.java # L469) etc. appellent cela à l'intérieur.

appendice

Opération Maven

Je ne savais même pas comment créer un projet dans Maven, alors notez la commande basée sur Quickstart (https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven?hl=ja) Je le ferai. Je l'ai exécuté dans le terminal de la console GCP.

Création de projet


$ mvn archetype:generate \
      -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
      -DarchetypeGroupId=com.google.cloud.dataflow \
      -DarchetypeVersion=2.1.0 \
      -DgroupId=com.google.cloud.dataflow.examples \
      -DartifactId=dos \
      -Dversion="0.1" \
      -DinteractiveMode=false \
      -Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java

Compiler et exécuter


$ mvn compile exec:java \
      -Dexec.mainClass=com.google.cloud.dataflow.examples.Dos

Recommended Posts

Modifications pour exécuter "Utilisation de Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." avec le SDK 2.1
Exécutez XGBoost avec Cloud Dataflow (Python)
GCP: répétez de Pub / Sub vers Cloud Functions et de Cloud Functions vers Pub / Sub
Exécutez un pipeline de machine learning avec Cloud Dataflow (Python)
Cloud Functions pour redimensionner les images à l'aide d'OpenCV avec le déclencheur Cloud Storage
Comment télécharger des fichiers sur Cloud Storage avec le SDK Python de Firebase
Autoriser l'exécution rapide des scripts Python dans Cloud Run à l'aide du répondeur
Pour exécuter gym_torcs avec ubutnu16
Télécharger des fichiers sur Aspera fournis avec IBM Cloud Object Storage (ICOS) à l'aide du SDK (version Python)
Introduction à Apache Beam avec Cloud Dataflow (sur la série 2.0.0) ~ Partie de base ~ ParDo ~
Introduction à Apache Beam avec Google Cloud Dataflow (sur la série 2.0.x) ~ Combine Edition ~