I [tried] window aggregation using Apache Flink and Scala API for SensorTag sensor data (http://qiita.com/masato/items/e88413a48921f35ef4db). Scala API as much as possible [Java 8 API](https://ci. Rewrite with apache.org/projects/flink/flink-docs-release-1.3/dev/java8.html).
[Flink-quickstart-java](https: /) in Sample Project using the Java API Create a Maven project using /mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java). The version of Apache Flink is 1.3.2
which is the same as that of Scala. Change groupId
and package
according to your environment.
$ mkdir -p ~/java_apps && cd ~/java_apps
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.2 \
-DgroupId=streams-flink-java-examples \
-DartifactId=streams-flink-java-examples \
-Dversion=0.1 \
-Dpackage=com.github.masato.streams.flink \
-DinteractiveMode=false
Change the setting of the plugin maven-compiler-plugin to Java 8 (1.8). Also add exec-maven-plugin to allow Maven to run the Flink app main ()
.
pom.xml
<build>
<plugins>
...
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.5.0</version>
<executions>
<execution>
<id>App</id>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<arguments>
<argument>-cp</argument>
<classpath/>
<argument>com.github.masato.streams.flink.WordCount</argument>
</arguments>
</configuration>
</plugin>
Execute the exec goal. WordCount The example of counts the words in the text and outputs them as standard.
$ mvn clean package exec:exec@App
...
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)
Delete all Java code created by the archetype of flink-quickstart-java and then write new code. I will.
$ rm src/main/java/com/github/masato/streams/flink/*.java
The source code can also be found in the Repository (https://github.com/masato/streams-flink-java-examples). The example written in Scala was one file, but in the case of Java, the classes are separated for easy understanding.
$ tree streams-flink-java-examples
streams-flink-java-examples
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── github
│ └── masato
│ └── streams
│ └── flink
│ ├── Accumulator.java
│ ├── Aggregate.java
│ ├── App.java
│ ├── Average.java
│ └── Sensor.java
└── resources
└── log4j.properties
App.java
The full text of the program that implements the main method. Example [App.scala] written in Scala (https://github.com/masato/streams-flink-scala-examples/blob/master/src/main/scala/com/github/masato/streams/flink/ Similar to App.scala) but AggregateFunction /AggregateFunction.html) and [WindowFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/windowing/ WindowFunction.html) is made into a class respectively.
App.java
package com.github.masato.streams.flink;
import java.util.Date;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class App {
private static DateTimeFormatter fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
private static final String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS_CONFIG");
private static final String groupId = System.getenv("GROUP_ID");
private static final String sourceTopic = System.getenv("SOURCE_TOPIC");
private static final String sinkTopic = System.getenv("SINK_TOPIC");
public static void main(String[] args) throws Exception {
final Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStream<ObjectNode> events =
env.addSource(new FlinkKafkaConsumer010<>(
sourceTopic,
new JSONDeserializationSchema(),
props)).name("events");
final SingleOutputStreamOperator<ObjectNode> timestamped =
events
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(10)) {
@Override
public long extractTimestamp(ObjectNode element) {
return element.get("time").asLong() * 1000;
}
});
timestamped
.map((v) -> {
String key = v.get("bid").asText();
double ambient = v.get("ambient").asDouble();
return new Sensor(key, ambient);
})
.keyBy(v -> v.key)
.timeWindow(Time.seconds(60))
.aggregate(new Aggregate(), new Average())
.map((v) -> {
ZonedDateTime zdt =
new Date(v.time).toInstant().atZone(ZoneId.systemDefault());
String time = fmt.format(zdt);
Map<String, Object> payload = new HashMap<String, Object>();
payload.put("time", time);
payload.put("bid", v.bid);
payload.put("ambient", v.sum);
String retval = new ObjectMapper().writeValueAsString(payload);
System.out.println(retval);
return retval;
})
.addSink(new FlinkKafkaProducer010<String>(
bootstrapServers,
sinkTopic,
new SimpleStringSchema())
).name("kafka");
env.execute();
}
}
Sensor.java
In Scala, the sensor data of the stream created a Tuple of Scala using the BD address as a key.
timestamped
.map { v =>
val key = v.get("bid").asText
val ambient = v.get("ambient").asDouble
(key, ambient)
}
Tuple2 like Scala even for Java 8 /Tuple2.html) can be used. However, it needs to be compiled with Eclipse JDT as described in Using Apache Flink with Java 8. .. Or [returns ()](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns -org.apache.flink.api.common.typeinfo.TypeInformation-) to [TupleTypeInfo](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache If you do not specify the element type hint in the Java class in /flink/api/java/typeutils/TupleTypeInfo.html), an error will occur.
.map((v) -> {
double ambient = v.get("value").get("ambient").asDouble();
String key = v.get("sensor").get("bid").asText();
return new Tuple2<>(key, ambient);
})
.returns(new TupleTypeInfo<>(TypeInformation.of(String.class),
TypeInformation.of(Double.class)))
It's a little troublesome, so it's easier to use an ordinary POJO.
Sensor.java
package com.github.masato.streams.flink;
public class Sensor {
public String key;
public double ambient;
public Sensor(String key, double ambient) {
this.key = key;
this.ambient = ambient;
}
}
Aggregate.java
Implemented AggregateFunction interface I will. Unlike Scala, Accumulator is not a case class, but otherwise it is almost the same.
Aggregate.java
package com.github.masato.streams.flink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.AggregateFunction;
public class Aggregate implements AggregateFunction<Sensor, Accumulator, Accumulator> {
private static final long serialVersionUID = 3355966737412029618L;
@Override
public Accumulator createAccumulator() {
return new Accumulator(0L, "", 0.0, 0);
}
@Override
public Accumulator merge(Accumulator a, Accumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
@Override
public void add(Sensor value, Accumulator acc) {
acc.sum += value.ambient;
acc.count++;
}
@Override
public Accumulator getResult(Accumulator acc) {
return acc;
}
}
Average.java
Implementation of WindowFunction is.
Average.java
package com.github.masato.streams.flink;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.util.Collector;
public class Average implements WindowFunction<Accumulator,
Accumulator, String, TimeWindow> {
private static final long serialVersionUID = 5532466889638450746L;
@Override
public void apply(String key,
TimeWindow window,
Iterable<Accumulator> input,
Collector<Accumulator> out) {
Accumulator in = input.iterator().next();
out.collect(new Accumulator(window.getEnd(), key, in.sum/in.count, in.count));
}
}
In the case of Scala, I wrote the ʻapply () implementation of WindowFunction directly in ʻaggregate
.
.aggregate(new Aggregate(),
( key: String,
window: TimeWindow,
input: Iterable[Accumulator],
out: Collector[Accumulator] ) => {
var in = input.iterator.next()
out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
}
)
pom.xml
Kafka is used as the source of the stream. Set the connection information in the environment variable of exec-maven-plugin. Please refer to here for preparing SensorTag and Raspberry Pi 3 and building Kafka cluster.
pom.xml
<build>
<plugins>
...
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.5.0</version>
<executions>
<execution>
<id>App</id>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<arguments>
<argument>-cp</argument>
<classpath/>
<argument>com.github.masato.streams.flink.App</argument>
</arguments>
<environmentVariables>
<APPLICATION_ID_CONFIG>sensortag</APPLICATION_ID_CONFIG>
<BOOTSTRAP_SERVERS_CONFIG>confluent:9092</BOOTSTRAP_SERVERS_CONFIG>
<SOURCE_TOPIC>sensortag</SOURCE_TOPIC>
<SINK_TOPIC>sensortag-sink</SINK_TOPIC>
<GROUP_ID>flinkGroup</GROUP_ID>
</environmentVariables>
</configuration>
</plugin>
Execute the exec goal of exec-maven-plugin after sending SensorTag data from Raspberry Pi 3 to Kafka.
$ mvn clean install exec:exec@App
The average value of the ambient temperature aggregated in a 60-second tumbling window is output as standard.
{"ambient":28.395833333333332,"time":"2017-08-28T11:57:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.44375,"time":"2017-08-28T11:58:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.46875,"time":"2017-08-28T11:59:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.5,"time":"2017-08-28T12:00:00+09:00","bid":"B0:B4:48:BD:DA:03"}