$ mkdir -p ~/java_apps && cd ~/java_apps
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.2 \
-DgroupId=streams-flink-java-examples \
-DartifactId=streams-flink-java-examples \
-Dversion=0.1 \
-Dpackage=com.github.masato.streams.flink \
Change the setting of the plugin maven-compiler-plugin to Java 8 (1.8). Also add exec-maven-plugin to allow Maven to run the Flink app main ()
Execute the exec goal. WordCount The example of counts the words in the text and outputs them as standard.
$ mvn clean package exec:exec@App
Delete all Java code created by the archetype of flink-quickstart-java and then write new code. I will.
$ rm src/main/java/com/github/masato/streams/flink/*.java
The source code can also be found in the Repository (https://github.com/masato/streams-flink-java-examples). The example written in Scala was one file, but in the case of Java, the classes are separated for easy understanding.
$ tree streams-flink-java-examples
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── github
│ └── masato
│ └── streams
│ └── flink
│ ├── Accumulator.java
│ ├── Aggregate.java
│ ├── App.java
│ ├── Average.java
│ └── Sensor.java
└── resources
└── log4j.properties
The full text of the program that implements the main method. Example [App.scala] written in Scala (https://github.com/masato/streams-flink-scala-examples/blob/master/src/main/scala/com/github/masato/streams/flink/ Similar to App.scala) but AggregateFunction /AggregateFunction.html) and [WindowFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/windowing/ WindowFunction.html) is made into a class respectively.
package com.github.masato.streams.flink;
import java.util.Date;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class App {
private static DateTimeFormatter fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
private static final String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS_CONFIG");
private static final String groupId = System.getenv("GROUP_ID");
private static final String sourceTopic = System.getenv("SOURCE_TOPIC");
private static final String sinkTopic = System.getenv("SINK_TOPIC");
public static void main(String[] args) throws Exception {
final Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
final StreamExecutionEnvironment env =
final DataStream<ObjectNode> events =
env.addSource(new FlinkKafkaConsumer010<>(
new JSONDeserializationSchema(),
final SingleOutputStreamOperator<ObjectNode> timestamped =
new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(10)) {
public long extractTimestamp(ObjectNode element) {
return element.get("time").asLong() * 1000;
.map((v) -> {
String key = v.get("bid").asText();
double ambient = v.get("ambient").asDouble();
return new Sensor(key, ambient);
.keyBy(v -> v.key)
.aggregate(new Aggregate(), new Average())
.map((v) -> {
ZonedDateTime zdt =
new Date(v.time).toInstant().atZone(ZoneId.systemDefault());
String time = fmt.format(zdt);
Map<String, Object> payload = new HashMap<String, Object>();
payload.put("time", time);
payload.put("bid", v.bid);
payload.put("ambient", v.sum);
String retval = new ObjectMapper().writeValueAsString(payload);
return retval;
.addSink(new FlinkKafkaProducer010<String>(
new SimpleStringSchema())
In Scala, the sensor data of the stream created a Tuple of Scala using the BD address as a key.
.map { v =>
val key = v.get("bid").asText
val ambient = v.get("ambient").asDouble
(key, ambient)
Tuple2 like Scala even for Java 8 /Tuple2.html) can be used. However, it needs to be compiled with Eclipse JDT as described in Using Apache Flink with Java 8. .. Or [returns ()](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns -org.apache.flink.api.common.typeinfo.TypeInformation-) to [TupleTypeInfo](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache If you do not specify the element type hint in the Java class in /flink/api/java/typeutils/TupleTypeInfo.html), an error will occur.
.map((v) -> {
double ambient = v.get("value").get("ambient").asDouble();
String key = v.get("sensor").get("bid").asText();
return new Tuple2<>(key, ambient);
.returns(new TupleTypeInfo<>(TypeInformation.of(String.class),
It's a little troublesome, so it's easier to use an ordinary POJO.
package com.github.masato.streams.flink;
public class Sensor {
public String key;
public double ambient;
public Sensor(String key, double ambient) {
this.key = key;
this.ambient = ambient;
Implemented AggregateFunction interface I will. Unlike Scala, Accumulator is not a case class, but otherwise it is almost the same.
package com.github.masato.streams.flink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.AggregateFunction;
public class Aggregate implements AggregateFunction<Sensor, Accumulator, Accumulator> {
private static final long serialVersionUID = 3355966737412029618L;
public Accumulator createAccumulator() {
return new Accumulator(0L, "", 0.0, 0);
public Accumulator merge(Accumulator a, Accumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
public void add(Sensor value, Accumulator acc) {
acc.sum += value.ambient;
public Accumulator getResult(Accumulator acc) {
return acc;
Implementation of WindowFunction is.
package com.github.masato.streams.flink;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.util.Collector;
public class Average implements WindowFunction<Accumulator,
Accumulator, String, TimeWindow> {
private static final long serialVersionUID = 5532466889638450746L;
public void apply(String key,
TimeWindow window,
Iterable<Accumulator> input,
Collector<Accumulator> out) {
Accumulator in = input.iterator().next();
out.collect(new Accumulator(window.getEnd(), key, in.sum/in.count, in.count));
In the case of Scala, I wrote the ʻapply () implementation of WindowFunction directly in ʻaggregate
.aggregate(new Aggregate(),
( key: String,
window: TimeWindow,
input: Iterable[Accumulator],
out: Collector[Accumulator] ) => {
var in = input.iterator.next()
out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
Kafka is used as the source of the stream. Set the connection information in the environment variable of exec-maven-plugin. Please refer to here for preparing SensorTag and Raspberry Pi 3 and building Kafka cluster.
Execute the exec goal of exec-maven-plugin after sending SensorTag data from Raspberry Pi 3 to Kafka.
$ mvn clean install exec:exec@App
The average value of the ambient temperature aggregated in a 60-second tumbling window is output as standard.