Agrégation de fenêtres de données de capteurs avec Apache Flink et Java 8

J'ai [essayé] l'agrégation de fenêtres en utilisant Apache Flink et l'API Scala pour les données des capteurs SensorTag (http://qiita.com/masato/items/e88413a48921f35ef4db). API Scala autant que possible [API Java 8](https: // ci. Réécrivez à apache.org/projects/flink/flink-docs-release-1.3/dev/java8.html).

Maven Architype

[Flink-quickstart-java](https: /) dans Exemple de projet utilisant l'API Java Utilisez /mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java) pour créer un projet Maven. La version d'Apache Flink est «1.3.2», qui est la même que celle de Scala. Modifiez groupId et package en fonction de votre environnement.

$ mkdir -p ~/java_apps && cd ~/java_apps
$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.3.2 \
    -DgroupId=streams-flink-java-examples \
    -DartifactId=streams-flink-java-examples \
    -Dversion=0.1 \
    -Dpackage=com.github.masato.streams.flink \
    -DinteractiveMode=false

Changez le paramètre [maven-compiler-plugin] du plugin (https://maven.apache.org/plugins/maven-compiler-plugin/) en Java 8 (1.8). Ajoutez également exec-maven-plugin pour permettre à Maven d'exécuter l'application Flink main ().

pom.xml


    <build>
        <plugins>
...
              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
              </plugin>

              <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.5.0</version>
                <executions>
                  <execution>
                    <id>App</id>
                    <goals>
                      <goal>exec</goal>
                    </goals>
                  </execution>
                </executions>
                <configuration>
                  <executable>java</executable>
                  <classpathScope>compile</classpathScope>
                  <arguments>
                    <argument>-cp</argument>
                    <classpath/>
                    <argument>com.github.masato.streams.flink.WordCount</argument>
                  </arguments>
                </configuration>
              </plugin>

Exécutez l 'objectif d'exécution. WordCount L'exemple de compte les mots dans le texte et les sort en standard.

$ mvn clean package exec:exec@App
...
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)

Tabulation des fenêtres

Supprimez tout le code Java créé par l'archétype de flink-quickstart-java puis écrivez un nouveau code. Je vais.

$ rm src/main/java/com/github/masato/streams/flink/*.java

Le code source peut également être trouvé dans le référentiel (https://github.com/masato/streams-flink-java-examples). L 'exemple écrit en Scala était un fichier, mais dans le cas de Java, les classes sont séparées pour faciliter la compréhension.

$ tree streams-flink-java-examples
streams-flink-java-examples
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── github
        │           └── masato
        │               └── streams
        │                   └── flink
        │                       ├── Accumulator.java
        │                       ├── Aggregate.java
        │                       ├── App.java
        │                       ├── Average.java
        │                       └── Sensor.java
        └── resources
            └── log4j.properties

App.java

Le texte complet du programme qui implémente la méthode principale. Exemple écrit en Scala [App.scala](https://github.com/masato/streams-flink-scala-examples/blob/master/src/main/scala/com/github/masato/streams/flink/ Similaire à App.scala) mais AggregateFunction /AggregateFunction.html) et [WindowFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/windowing/ WindowFunction.html) est transformé en une classe.

App.java


package com.github.masato.streams.flink;

import java.util.Date;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class App {
    private static DateTimeFormatter fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME;

    private static final String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS_CONFIG");
    private static final String groupId = System.getenv("GROUP_ID");
    private static final String sourceTopic = System.getenv("SOURCE_TOPIC");

    private static final String sinkTopic = System.getenv("SINK_TOPIC");

    public static void main(String[] args) throws Exception {
        final Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id", groupId);

        final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final DataStream<ObjectNode> events =
                                env.addSource(new FlinkKafkaConsumer010<>(
                                  sourceTopic,
                                  new JSONDeserializationSchema(),
                                  props)).name("events");

        final SingleOutputStreamOperator<ObjectNode> timestamped =
            events
            .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(ObjectNode element) {
                        return element.get("time").asLong() * 1000;
                    }
                });

        timestamped
            .map((v) -> {
                    String key =  v.get("bid").asText();
                    double ambient = v.get("ambient").asDouble();
                    return new Sensor(key, ambient);
            })
            .keyBy(v -> v.key)
            .timeWindow(Time.seconds(60))
            .aggregate(new Aggregate(), new Average())
            .map((v) -> {
                    ZonedDateTime zdt =
                        new Date(v.time).toInstant().atZone(ZoneId.systemDefault());
                    String time = fmt.format(zdt);

                    Map<String, Object> payload = new HashMap<String, Object>();
                    payload.put("time", time);
                    payload.put("bid", v.bid);
                    payload.put("ambient", v.sum);

                    String retval = new ObjectMapper().writeValueAsString(payload);
                    System.out.println(retval);
                    return retval;
                })
            .addSink(new FlinkKafkaProducer010<String>(
                         bootstrapServers,
                         sinkTopic,
                         new SimpleStringSchema())
                     ).name("kafka");

        env.execute();
    }
}

Sensor.java

Dans Scala, les données du capteur du flux ont créé un Tuple de Scala en utilisant l'adresse BD comme clé.

    timestamped
      .map { v =>
        val key =  v.get("bid").asText
        val ambient = v.get("ambient").asDouble
        (key, ambient)
      }

Même dans le cas de Java 8, Tuple 2 comme Scala /Tuple2.html) peut être utilisé. Cependant, il doit être compilé avec Eclipse JDT comme décrit dans Utilisation d'Apache Flink avec Java 8. .. Ou [Returns ()](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns -org.apache.flink.api.common.typeinfo.TypeInformation-) à [TupleTypeInfo](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache Si vous ne spécifiez pas l'indication de type d'élément dans la classe Java dans /flink/api/java/typeutils/TupleTypeInfo.html), une erreur se produit.

            .map((v) -> {
                double ambient = v.get("value").get("ambient").asDouble();
                String key =  v.get("sensor").get("bid").asText();
                return new Tuple2<>(key, ambient);
            })
            .returns(new TupleTypeInfo<>(TypeInformation.of(String.class),
                                         TypeInformation.of(Double.class)))

C'est un peu gênant, il est donc plus facile d'utiliser un POJO ordinaire.

Sensor.java


package com.github.masato.streams.flink;

public class Sensor {
    public String key;
    public double ambient;

    public Sensor(String key, double ambient) {
        this.key = key;
        this.ambient = ambient;
    }
}

Aggregate.java

Implémentation de l'interface AggregateFunction Je vais. Contrairement à Scala, Accumulator n'est pas une classe de cas, mais sinon c'est presque la même chose.

Aggregate.java


package com.github.masato.streams.flink;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.common.functions.AggregateFunction;

public class Aggregate implements AggregateFunction<Sensor, Accumulator, Accumulator> {

    private static final long serialVersionUID = 3355966737412029618L;

    @Override
    public Accumulator createAccumulator() {
        return new Accumulator(0L, "", 0.0, 0);
    }

    @Override
    public Accumulator merge(Accumulator a, Accumulator b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    @Override
    public void add(Sensor value, Accumulator acc) {
        acc.sum += value.ambient;
        acc.count++;
    }

    @Override
    public Accumulator getResult(Accumulator acc) {
        return acc;
    }
}

Average.java

Implémentation de WindowFunction est.

Average.java


package com.github.masato.streams.flink;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.util.Collector;

public class Average implements WindowFunction<Accumulator,
                                               Accumulator, String, TimeWindow> {

    private static final long serialVersionUID = 5532466889638450746L;

    @Override
    public void apply(String key,
                      TimeWindow window,
                      Iterable<Accumulator> input,
                      Collector<Accumulator> out) {

        Accumulator in = input.iterator().next();
        out.collect(new Accumulator(window.getEnd(), key, in.sum/in.count, in.count));
    }
}

Dans le cas de Scala, j'ai écrit l'implémentation ʻapply () de WindowFunction directement dans ʻaggregate.

      .aggregate(new Aggregate(),
        ( key: String,
          window: TimeWindow,
          input: Iterable[Accumulator],
          out: Collector[Accumulator] ) => {
            var in = input.iterator.next()
            out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
          }
      )

pom.xml

Kafka est utilisé comme source du flux. Définissez les informations de connexion dans la variable d'environnement de exec-maven-plugin. Veuillez vous référer à here pour préparer SensorTag et Raspberry Pi 3 et construire le cluster Kafka.

pom.xml


    <build>
        <plugins>
...
              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
              </plugin>

              <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.5.0</version>
                <executions>
                  <execution>
                    <id>App</id>
                    <goals>
                      <goal>exec</goal>
                    </goals>
                  </execution>
                </executions>
                <configuration>
                  <executable>java</executable>
                  <classpathScope>compile</classpathScope>
                  <arguments>
                    <argument>-cp</argument>
                    <classpath/>
                    <argument>com.github.masato.streams.flink.App</argument>
                  </arguments>
                  <environmentVariables>
                    <APPLICATION_ID_CONFIG>sensortag</APPLICATION_ID_CONFIG>
                    <BOOTSTRAP_SERVERS_CONFIG>confluent:9092</BOOTSTRAP_SERVERS_CONFIG>
                    <SOURCE_TOPIC>sensortag</SOURCE_TOPIC>
                    <SINK_TOPIC>sensortag-sink</SINK_TOPIC>
                    <GROUP_ID>flinkGroup</GROUP_ID>
                  </environmentVariables>
                </configuration>
              </plugin>

Courir

Exécutez l'objectif d'exécution de exec-maven-plugin après l'envoi des données SensorTag de Raspberry Pi 3 à Kafka.

$ mvn clean install exec:exec@App

La valeur moyenne de la température ambiante agrégée dans une fenêtre de culbutage de 60 secondes est sortie en standard.

{"ambient":28.395833333333332,"time":"2017-08-28T11:57:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.44375,"time":"2017-08-28T11:58:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.46875,"time":"2017-08-28T11:59:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.5,"time":"2017-08-28T12:00:00+09:00","bid":"B0:B4:48:BD:DA:03"}

Recommended Posts

Agrégation de fenêtres de données de capteurs avec Apache Flink et Java 8
Agrégation de fenêtres de SensorTag avec Kafka Streams
Résumé du comportement de ToString avec les annotations Java et Groovy
Compatibilité de Spring JDBC et My Batis avec Spring Data JDBC (provisoire)
Agrégation et analyse de journaux (utilisation d'AWS Athena en Java)
Programmation Java (variables et données)
Traitement des données avec Apache Flink
Apache Hadoop et Java 9 (partie 1)
[Java] Simplifiez la mise en œuvre de la gestion de l'historique des données avec Reladomo
Avantages et inconvénients de Java
Lisez les données de la base de données de nuages de points de la préfecture de Shizuoka avec Java et générez une photographie aérienne et une élévation PNG.
Représentez graphiquement les informations du capteur de Raspberry Pi en Java et vérifiez-les avec un navigateur Web
Lisez les données de Shizuoka Prefecture Point Cloud DB avec Java et essayez de détecter la hauteur de l'arbre.
Comparaison du développement d'applications WEB avec Rails et Java Servlet + JSP
Utiliser java avec MSYS et Cygwin
Installez Java et Tomcat avec Ansible
À propos de Biocontainers fastqc et Java
Types de données de base et types de référence (Java)
Utilisez JDBC avec Java et Scala.
Sortie PDF et TIFF avec Java 8
[Java] Zones de données d'exécution de JVM
Liaison de données avec Spark et Cassandra
Types de données de base et types de référence Java
Crypter avec Java et décrypter avec C #
Je veux afficher des images avec REST Controller de Java et Spring!
Dessinez les données de chemin SVG (attribut d de l'élément de chemin) avec Java + Apache Batik
Surveillez les applications Java avec jolokia et hawtio
Spring avec Kotorin --2 RestController et Data Class
Après 3 mois de formation Java et Spring
Lier le code Java et C ++ avec SWIG
Essayons WebSocket avec Java et javascript!
[Java] Gérer les fichiers Excel avec Apache POI
[Java] Lecture et écriture de fichiers avec OpenCSV
[Java / Swift] Comparaison de l'interface Java et du protocole Swift
Résumé de Java Math.random et import (calendrier)
[Java] Contenu de l'interface de collection et de l'interface de liste
Acquisition de données JSON et rotation des valeurs
Discrimination d'énum dans Java 7 et supérieur
J'ai reçu les données du voyage (application agenda) en Java et j'ai essayé de les visualiser # 001
Créez un environnement Apache et Tomcat avec Docker. À propos, coopération Maven & Java