Window aggregation of sensor data with Apache Flink and Java 8

I [tried] window aggregation using Apache Flink and Scala API for SensorTag sensor data (http://qiita.com/masato/items/e88413a48921f35ef4db). Scala API as much as possible [Java 8 API](https://ci. Rewrite with apache.org/projects/flink/flink-docs-release-1.3/dev/java8.html).

Maven archetype

[Flink-quickstart-java](https: /) in Sample Project using the Java API Create a Maven project using /mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java). The version of Apache Flink is 1.3.2 which is the same as that of Scala. Change groupId and package according to your environment.

$ mkdir -p ~/java_apps && cd ~/java_apps
$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.3.2 \
    -DgroupId=streams-flink-java-examples \
    -DartifactId=streams-flink-java-examples \
    -Dversion=0.1 \
    -Dpackage=com.github.masato.streams.flink \
    -DinteractiveMode=false

Change the setting of the plugin maven-compiler-plugin to Java 8 (1.8). Also add exec-maven-plugin to allow Maven to run the Flink app main ().

pom.xml


    <build>
        <plugins>
...
              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
              </plugin>

              <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.5.0</version>
                <executions>
                  <execution>
                    <id>App</id>
                    <goals>
                      <goal>exec</goal>
                    </goals>
                  </execution>
                </executions>
                <configuration>
                  <executable>java</executable>
                  <classpathScope>compile</classpathScope>
                  <arguments>
                    <argument>-cp</argument>
                    <classpath/>
                    <argument>com.github.masato.streams.flink.WordCount</argument>
                  </arguments>
                </configuration>
              </plugin>

Execute the exec goal. WordCount The example of counts the words in the text and outputs them as standard.

$ mvn clean package exec:exec@App
...
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)

Window tabulation

Delete all Java code created by the archetype of flink-quickstart-java and then write new code. I will.

$ rm src/main/java/com/github/masato/streams/flink/*.java

The source code can also be found in the Repository (https://github.com/masato/streams-flink-java-examples). The example written in Scala was one file, but in the case of Java, the classes are separated for easy understanding.

$ tree streams-flink-java-examples
streams-flink-java-examples
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── github
        │           └── masato
        │               └── streams
        │                   └── flink
        │                       ├── Accumulator.java
        │                       ├── Aggregate.java
        │                       ├── App.java
        │                       ├── Average.java
        │                       └── Sensor.java
        └── resources
            └── log4j.properties

App.java

The full text of the program that implements the main method. Example [App.scala] written in Scala (https://github.com/masato/streams-flink-scala-examples/blob/master/src/main/scala/com/github/masato/streams/flink/ Similar to App.scala) but AggregateFunction /AggregateFunction.html) and [WindowFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/windowing/ WindowFunction.html) is made into a class respectively.

App.java


package com.github.masato.streams.flink;

import java.util.Date;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class App {
    private static DateTimeFormatter fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME;

    private static final String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS_CONFIG");
    private static final String groupId = System.getenv("GROUP_ID");
    private static final String sourceTopic = System.getenv("SOURCE_TOPIC");

    private static final String sinkTopic = System.getenv("SINK_TOPIC");

    public static void main(String[] args) throws Exception {
        final Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id", groupId);

        final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final DataStream<ObjectNode> events =
                                env.addSource(new FlinkKafkaConsumer010<>(
                                  sourceTopic,
                                  new JSONDeserializationSchema(),
                                  props)).name("events");

        final SingleOutputStreamOperator<ObjectNode> timestamped =
            events
            .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(ObjectNode element) {
                        return element.get("time").asLong() * 1000;
                    }
                });

        timestamped
            .map((v) -> {
                    String key =  v.get("bid").asText();
                    double ambient = v.get("ambient").asDouble();
                    return new Sensor(key, ambient);
            })
            .keyBy(v -> v.key)
            .timeWindow(Time.seconds(60))
            .aggregate(new Aggregate(), new Average())
            .map((v) -> {
                    ZonedDateTime zdt =
                        new Date(v.time).toInstant().atZone(ZoneId.systemDefault());
                    String time = fmt.format(zdt);

                    Map<String, Object> payload = new HashMap<String, Object>();
                    payload.put("time", time);
                    payload.put("bid", v.bid);
                    payload.put("ambient", v.sum);

                    String retval = new ObjectMapper().writeValueAsString(payload);
                    System.out.println(retval);
                    return retval;
                })
            .addSink(new FlinkKafkaProducer010<String>(
                         bootstrapServers,
                         sinkTopic,
                         new SimpleStringSchema())
                     ).name("kafka");

        env.execute();
    }
}

Sensor.java

In Scala, the sensor data of the stream created a Tuple of Scala using the BD address as a key.

    timestamped
      .map { v =>
        val key =  v.get("bid").asText
        val ambient = v.get("ambient").asDouble
        (key, ambient)
      }

Tuple2 like Scala even for Java 8 /Tuple2.html) can be used. However, it needs to be compiled with Eclipse JDT as described in Using Apache Flink with Java 8. .. Or [returns ()](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns -org.apache.flink.api.common.typeinfo.TypeInformation-) to [TupleTypeInfo](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache If you do not specify the element type hint in the Java class in /flink/api/java/typeutils/TupleTypeInfo.html), an error will occur.

            .map((v) -> {
                double ambient = v.get("value").get("ambient").asDouble();
                String key =  v.get("sensor").get("bid").asText();
                return new Tuple2<>(key, ambient);
            })
            .returns(new TupleTypeInfo<>(TypeInformation.of(String.class),
                                         TypeInformation.of(Double.class)))

It's a little troublesome, so it's easier to use an ordinary POJO.

Sensor.java


package com.github.masato.streams.flink;

public class Sensor {
    public String key;
    public double ambient;

    public Sensor(String key, double ambient) {
        this.key = key;
        this.ambient = ambient;
    }
}

Aggregate.java

Implemented AggregateFunction interface I will. Unlike Scala, Accumulator is not a case class, but otherwise it is almost the same.

Aggregate.java


package com.github.masato.streams.flink;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.common.functions.AggregateFunction;

public class Aggregate implements AggregateFunction<Sensor, Accumulator, Accumulator> {

    private static final long serialVersionUID = 3355966737412029618L;

    @Override
    public Accumulator createAccumulator() {
        return new Accumulator(0L, "", 0.0, 0);
    }

    @Override
    public Accumulator merge(Accumulator a, Accumulator b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    @Override
    public void add(Sensor value, Accumulator acc) {
        acc.sum += value.ambient;
        acc.count++;
    }

    @Override
    public Accumulator getResult(Accumulator acc) {
        return acc;
    }
}

Average.java

Implementation of WindowFunction is.

Average.java


package com.github.masato.streams.flink;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.util.Collector;

public class Average implements WindowFunction<Accumulator,
                                               Accumulator, String, TimeWindow> {

    private static final long serialVersionUID = 5532466889638450746L;

    @Override
    public void apply(String key,
                      TimeWindow window,
                      Iterable<Accumulator> input,
                      Collector<Accumulator> out) {

        Accumulator in = input.iterator().next();
        out.collect(new Accumulator(window.getEnd(), key, in.sum/in.count, in.count));
    }
}

In the case of Scala, I wrote the ʻapply () implementation of WindowFunction directly in ʻaggregate.

      .aggregate(new Aggregate(),
        ( key: String,
          window: TimeWindow,
          input: Iterable[Accumulator],
          out: Collector[Accumulator] ) => {
            var in = input.iterator.next()
            out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
          }
      )

pom.xml

Kafka is used as the source of the stream. Set the connection information in the environment variable of exec-maven-plugin. Please refer to here for preparing SensorTag and Raspberry Pi 3 and building Kafka cluster.

pom.xml


    <build>
        <plugins>
...
              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
              </plugin>

              <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.5.0</version>
                <executions>
                  <execution>
                    <id>App</id>
                    <goals>
                      <goal>exec</goal>
                    </goals>
                  </execution>
                </executions>
                <configuration>
                  <executable>java</executable>
                  <classpathScope>compile</classpathScope>
                  <arguments>
                    <argument>-cp</argument>
                    <classpath/>
                    <argument>com.github.masato.streams.flink.App</argument>
                  </arguments>
                  <environmentVariables>
                    <APPLICATION_ID_CONFIG>sensortag</APPLICATION_ID_CONFIG>
                    <BOOTSTRAP_SERVERS_CONFIG>confluent:9092</BOOTSTRAP_SERVERS_CONFIG>
                    <SOURCE_TOPIC>sensortag</SOURCE_TOPIC>
                    <SINK_TOPIC>sensortag-sink</SINK_TOPIC>
                    <GROUP_ID>flinkGroup</GROUP_ID>
                  </environmentVariables>
                </configuration>
              </plugin>

Run

Execute the exec goal of exec-maven-plugin after sending SensorTag data from Raspberry Pi 3 to Kafka.

$ mvn clean install exec:exec@App

The average value of the ambient temperature aggregated in a 60-second tumbling window is output as standard.

{"ambient":28.395833333333332,"time":"2017-08-28T11:57:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.44375,"time":"2017-08-28T11:58:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.46875,"time":"2017-08-28T11:59:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.5,"time":"2017-08-28T12:00:00+09:00","bid":"B0:B4:48:BD:DA:03"}

Recommended Posts

Window aggregation of sensor data with Apache Flink and Java 8
Window aggregation of SensorTag with Kafka Streams
Summary of ToString behavior with Java and Groovy annotations
Compatibility of Spring JDBC and MyBatis with Spring Data JDBC (provisional)
Log aggregation and analysis (working with AWS Athena in Java)
Java programming (variables and data)
Data processing using Apache Flink
Apache Hadoop and Java 9 (Part 1)
[Java] Simplify the implementation of data history management with Reladomo
Advantages and disadvantages of Java
Read the data of Shizuoka point cloud DB with Java and generate aerial photograph and elevation PNG.
Graph the sensor information of Raspberry Pi in Java and check it with a web browser
Read the data of Shizuoka point cloud DB with Java and try to detect the tree height.
Comparison of WEB application development with Rails and Java Servlet + JSP
Use java with MSYS and Cygwin
Install Java and Tomcat with Ansible
About fastqc of Biocontainers and Java
Basic data types and reference types (Java)
Use JDBC with Java and Scala.
Output PDF and TIFF with Java 8
[Java] Runtime Data Areas of JVM
Data linkage with Spark and Cassandra
[Java] Judgment of identity and equivalence
Java basic data types and reference types
Encrypt with Java and decrypt with C #
I want to display images with REST Controller of Java and Spring!
Draw SVG path data (d attribute of path element) in Java + Apache Batik
Monitor Java applications with jolokia and hawtio
Spring with Kotorin --2 RestController and Data Class
After 3 months of Java and Spring training
Link Java and C ++ code with SWIG
Let's try WebSocket with Java and javascript!
[Java] Handle Excel files with Apache POI
[Java] Reading and writing files with OpenCSV
[Java] Inheritance and structure of HttpServlet class
[Java / Swift] Comparison of Java Interface and Swift Protocol
Summary of Java Math.random and import (Calendar)
[Java] Contents of Collection interface and List interface
Basics of java basics ② ~ if statement and switch statement ~
Acquisition of JSON data and rotation of values
Discrimination of Enums in Java 7 and above
I received the data of the journey (diary application) in Java and visualized it # 001
Build Apache and Tomcat environment with Docker. By the way, Maven & Java cooperation