Fensteraggregation von Sensordaten mit Apache Flink und Java 8

Ich habe die Fensteraggregation mit der Apache Flink- und Scala-API für SensorTag-Sensordaten (http://qiita.com/masato/items/e88413a48921f35ef4db) versucht. Scala API so viel wie möglich [Java 8 API](https: // ci. Schreiben Sie unter apache.org/projects/flink/flink-docs-release-1.3/dev/java8.html neu.

Maven Architype

[Flink-quickstart-java](https: /) in Beispielprojekt mit der Java-API Verwenden Sie /mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java), um ein Maven-Projekt zu erstellen. Die Version von Apache Flink ist "1.3.2" und entspricht der von Scala. Ändern Sie groupId und package entsprechend Ihrer Umgebung.

$ mkdir -p ~/java_apps && cd ~/java_apps
$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.3.2 \
    -DgroupId=streams-flink-java-examples \
    -DartifactId=streams-flink-java-examples \
    -Dversion=0.1 \
    -Dpackage=com.github.masato.streams.flink \
    -DinteractiveMode=false

Ändern Sie die Einstellung maven-compiler-plugin des Plugins in Java 8 (1.8). Fügen Sie außerdem exec-maven-plugin hinzu, damit Maven die Flink-App main () ausführen kann.

pom.xml


    <build>
        <plugins>
...
              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
              </plugin>

              <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.5.0</version>
                <executions>
                  <execution>
                    <id>App</id>
                    <goals>
                      <goal>exec</goal>
                    </goals>
                  </execution>
                </executions>
                <configuration>
                  <executable>java</executable>
                  <classpathScope>compile</classpathScope>
                  <arguments>
                    <argument>-cp</argument>
                    <classpath/>
                    <argument>com.github.masato.streams.flink.WordCount</argument>
                  </arguments>
                </configuration>
              </plugin>

Führen Sie das [Ausführungsziel] aus (http://www.mojohaus.org/exec-maven-plugin/examples/example-exec-for-java-programs.html). WordCount Das Beispiel zählt die Wörter im Text und gibt sie standardmäßig aus.

$ mvn clean package exec:exec@App
...
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)

Fenstertabelle

Löschen Sie den gesamten Java-Code, der vom Archetyp flink-quickstart-java erstellt wurde, und schreiben Sie dann neuen Code. Ich werde.

$ rm src/main/java/com/github/masato/streams/flink/*.java

Der Quellcode befindet sich auch im Repository (https://github.com/masato/streams-flink-java-examples). Das in Scala geschriebene Beispiel war eine Datei, aber im Fall von Java sind die Klassen zum besseren Verständnis getrennt.

$ tree streams-flink-java-examples
streams-flink-java-examples
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── github
        │           └── masato
        │               └── streams
        │                   └── flink
        │                       ├── Accumulator.java
        │                       ├── Aggregate.java
        │                       ├── App.java
        │                       ├── Average.java
        │                       └── Sensor.java
        └── resources
            └── log4j.properties

App.java

Der vollständige Text des Programms, das die Hauptmethode implementiert. Beispiel [App.scala] in Scala geschrieben (https://github.com/masato/streams-flink-scala-examples/blob/master/src/main/scala/com/github/masato/streams/flink/ Ähnlich wie App.scala), aber AggregateFunction /AggregateFunction.html) und [WindowFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/windowing/ WindowFunction.html) wird zu einer Klasse gemacht.

App.java


package com.github.masato.streams.flink;

import java.util.Date;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class App {
    private static DateTimeFormatter fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME;

    private static final String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS_CONFIG");
    private static final String groupId = System.getenv("GROUP_ID");
    private static final String sourceTopic = System.getenv("SOURCE_TOPIC");

    private static final String sinkTopic = System.getenv("SINK_TOPIC");

    public static void main(String[] args) throws Exception {
        final Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id", groupId);

        final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final DataStream<ObjectNode> events =
                                env.addSource(new FlinkKafkaConsumer010<>(
                                  sourceTopic,
                                  new JSONDeserializationSchema(),
                                  props)).name("events");

        final SingleOutputStreamOperator<ObjectNode> timestamped =
            events
            .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(ObjectNode element) {
                        return element.get("time").asLong() * 1000;
                    }
                });

        timestamped
            .map((v) -> {
                    String key =  v.get("bid").asText();
                    double ambient = v.get("ambient").asDouble();
                    return new Sensor(key, ambient);
            })
            .keyBy(v -> v.key)
            .timeWindow(Time.seconds(60))
            .aggregate(new Aggregate(), new Average())
            .map((v) -> {
                    ZonedDateTime zdt =
                        new Date(v.time).toInstant().atZone(ZoneId.systemDefault());
                    String time = fmt.format(zdt);

                    Map<String, Object> payload = new HashMap<String, Object>();
                    payload.put("time", time);
                    payload.put("bid", v.bid);
                    payload.put("ambient", v.sum);

                    String retval = new ObjectMapper().writeValueAsString(payload);
                    System.out.println(retval);
                    return retval;
                })
            .addSink(new FlinkKafkaProducer010<String>(
                         bootstrapServers,
                         sinkTopic,
                         new SimpleStringSchema())
                     ).name("kafka");

        env.execute();
    }
}

Sensor.java

In Scala haben die Sensordaten des Streams ein Tupel von Scala erstellt, wobei die BD-Adresse als Schlüssel verwendet wurde.

    timestamped
      .map { v =>
        val key =  v.get("bid").asText
        val ambient = v.get("ambient").asDouble
        (key, ambient)
      }

Selbst im Fall von Java 8 mag Tuple 2 Scala /Tuple2.html) kann verwendet werden. Es muss jedoch mit Eclipse JDT kompiliert werden, wie unter Verwenden von Apache Flink mit Java 8 beschrieben. .. Oder [return ()](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns -org.apache.flink.api.common.typeinfo.TypeInformation-) bis [TupleTypeInfo](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache Wenn Sie den Elementtyp-Hinweis in der Java-Klasse in /flink/api/java/typeutils/TupleTypeInfo.html) nicht angeben, tritt ein Fehler auf.

            .map((v) -> {
                double ambient = v.get("value").get("ambient").asDouble();
                String key =  v.get("sensor").get("bid").asText();
                return new Tuple2<>(key, ambient);
            })
            .returns(new TupleTypeInfo<>(TypeInformation.of(String.class),
                                         TypeInformation.of(Double.class)))

Es ist ein wenig mühsam, daher ist es einfacher, gewöhnliches POJO zu verwenden.

Sensor.java


package com.github.masato.streams.flink;

public class Sensor {
    public String key;
    public double ambient;

    public Sensor(String key, double ambient) {
        this.key = key;
        this.ambient = ambient;
    }
}

Aggregate.java

Implementierung der Schnittstelle AggregateFunction Ich werde. Im Gegensatz zu Scala ist Accumulator keine Fallklasse, aber ansonsten ist es fast dieselbe.

Aggregate.java


package com.github.masato.streams.flink;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.common.functions.AggregateFunction;

public class Aggregate implements AggregateFunction<Sensor, Accumulator, Accumulator> {

    private static final long serialVersionUID = 3355966737412029618L;

    @Override
    public Accumulator createAccumulator() {
        return new Accumulator(0L, "", 0.0, 0);
    }

    @Override
    public Accumulator merge(Accumulator a, Accumulator b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    @Override
    public void add(Sensor value, Accumulator acc) {
        acc.sum += value.ambient;
        acc.count++;
    }

    @Override
    public Accumulator getResult(Accumulator acc) {
        return acc;
    }
}

Average.java

Implementierung von WindowFunction ist.

Average.java


package com.github.masato.streams.flink;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.util.Collector;

public class Average implements WindowFunction<Accumulator,
                                               Accumulator, String, TimeWindow> {

    private static final long serialVersionUID = 5532466889638450746L;

    @Override
    public void apply(String key,
                      TimeWindow window,
                      Iterable<Accumulator> input,
                      Collector<Accumulator> out) {

        Accumulator in = input.iterator().next();
        out.collect(new Accumulator(window.getEnd(), key, in.sum/in.count, in.count));
    }
}

Im Fall von Scala habe ich die apply () Implementierung von WindowFunction direkt in aggregat geschrieben.

      .aggregate(new Aggregate(),
        ( key: String,
          window: TimeWindow,
          input: Iterable[Accumulator],
          out: Collector[Accumulator] ) => {
            var in = input.iterator.next()
            out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
          }
      )

pom.xml

Kafka wird als Quelle des Streams verwendet. Legen Sie die Verbindungsinformationen in der Umgebungsvariablen von [exec-maven-plugin] fest (http://www.mojohaus.org/exec-maven-plugin/). Informationen zur Vorbereitung von SensorTag und Raspberry Pi 3 sowie zum Erstellen eines Kafka-Clusters finden Sie unter hier.

pom.xml


    <build>
        <plugins>
...
              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
              </plugin>

              <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.5.0</version>
                <executions>
                  <execution>
                    <id>App</id>
                    <goals>
                      <goal>exec</goal>
                    </goals>
                  </execution>
                </executions>
                <configuration>
                  <executable>java</executable>
                  <classpathScope>compile</classpathScope>
                  <arguments>
                    <argument>-cp</argument>
                    <classpath/>
                    <argument>com.github.masato.streams.flink.App</argument>
                  </arguments>
                  <environmentVariables>
                    <APPLICATION_ID_CONFIG>sensortag</APPLICATION_ID_CONFIG>
                    <BOOTSTRAP_SERVERS_CONFIG>confluent:9092</BOOTSTRAP_SERVERS_CONFIG>
                    <SOURCE_TOPIC>sensortag</SOURCE_TOPIC>
                    <SINK_TOPIC>sensortag-sink</SINK_TOPIC>
                    <GROUP_ID>flinkGroup</GROUP_ID>
                  </environmentVariables>
                </configuration>
              </plugin>

Lauf

Führen Sie das Exec-Ziel des Exec-Maven-Plugins aus, nachdem Sie SensorTag-Daten von Raspberry Pi 3 an Kafka gesendet haben.

$ mvn clean install exec:exec@App

Der Durchschnittswert der Umgebungstemperatur, die in einem 60-Sekunden-Taumelfenster aggregiert wird, wird standardmäßig ausgegeben.

{"ambient":28.395833333333332,"time":"2017-08-28T11:57:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.44375,"time":"2017-08-28T11:58:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.46875,"time":"2017-08-28T11:59:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.5,"time":"2017-08-28T12:00:00+09:00","bid":"B0:B4:48:BD:DA:03"}

Recommended Posts

Fensteraggregation von Sensordaten mit Apache Flink und Java 8
Fensteraggregation von SensorTag mit Kafka Streams
Zusammenfassung des ToString-Verhaltens mit Java- und Groovy-Annotationen
Kompatibilität von Spring JDBC und My Batis mit Spring Data JDBC (vorläufig)
Protokollaggregation und -analyse (Arbeiten mit AWS Athena in Java)
Java-Programmierung (Variablen und Daten)
Datenverarbeitung mit Apache Flink
Apache Hadoop und Java 9 (Teil 1)
[Java] Vereinfachen Sie die Implementierung der Datenverlaufsverwaltung mit Reladomo
Vor- und Nachteile von Java
Lesen Sie die Daten der Shizuoka Prefecture Point Cloud DB mit Java und erstellen Sie Luftbilder und Höhen-PNGs.
Stellen Sie die Sensorinformationen von Raspberry Pi in Java grafisch dar und überprüfen Sie sie mit einem Webbrowser
Lesen Sie die Daten der Shizuoka Prefecture Point Cloud DB mit Java und versuchen Sie, die Baumhöhe zu ermitteln.
Vergleich der WEB-Anwendungsentwicklung mit Rails und Java Servlet + JSP
Verwenden Sie Java mit MSYS und Cygwin
Installieren Sie Java und Tomcat mit Ansible
Über Biocontainer fastqc und Java
Grundlegende Datentypen und Referenztypen (Java)
Verwenden Sie JDBC mit Java und Scala.
PDF und TIFF mit Java 8 ausgeben
[Java] Laufzeitdatenbereiche von JVM
Datenverknüpfung mit Spark und Cassandra
Java-Basisdatentypen und Referenztypen
Mit Java verschlüsseln und mit C # entschlüsseln
Ich möchte Bilder mit REST Controller von Java und Spring anzeigen!
Zeichnen Sie SVG-Pfaddaten (d-Attribut des Pfadelements) mit Java + Apache Batik
Überwachen Sie Java-Anwendungen mit Jolokia und Hawtio
Feder mit Kotorin --2 RestController und Datenklasse
Nach 3 Monaten Java- und Frühlingstraining
Verknüpfen Sie Java- und C ++ - Code mit SWIG
Probieren wir WebSocket mit Java und Javascript aus!
[Java] Behandeln Sie Excel-Dateien mit Apache POI
[Java] Lesen und Schreiben von Dateien mit OpenCSV
[Java / Swift] Vergleich von Java-Schnittstelle und Swift-Protokoll
Zusammenfassung von Java Math.random und Import (Kalender)
[Java] Inhalt der Collection-Schnittstelle und der List-Schnittstelle
Erfassung von JSON-Daten und Drehung von Werten
Diskriminierung von Enum in Java 7 und höher
Ich habe die Daten der Reise (Tagebuchanwendung) in Java erhalten und versucht, sie # 001 zu visualisieren
Erstellen Sie mit Docker eine Apache- und Tomcat-Umgebung. Übrigens Maven & Java Kooperation