Ich habe die Fensteraggregation mit der Apache Flink- und Scala-API für SensorTag-Sensordaten (http://qiita.com/masato/items/e88413a48921f35ef4db) versucht. Scala API so viel wie möglich [Java 8 API](https: // ci. Schreiben Sie unter apache.org/projects/flink/flink-docs-release-1.3/dev/java8.html neu.
[Flink-quickstart-java](https: /) in Beispielprojekt mit der Java-API Verwenden Sie /mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java), um ein Maven-Projekt zu erstellen. Die Version von Apache Flink ist "1.3.2" und entspricht der von Scala. Ändern Sie groupId
und package
entsprechend Ihrer Umgebung.
$ mkdir -p ~/java_apps && cd ~/java_apps
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.2 \
-DgroupId=streams-flink-java-examples \
-DartifactId=streams-flink-java-examples \
-Dversion=0.1 \
-Dpackage=com.github.masato.streams.flink \
-DinteractiveMode=false
Ändern Sie die Einstellung maven-compiler-plugin des Plugins in Java 8 (1.8). Fügen Sie außerdem exec-maven-plugin hinzu, damit Maven die Flink-App main ()
ausführen kann.
pom.xml
<build>
<plugins>
...
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.5.0</version>
<executions>
<execution>
<id>App</id>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<arguments>
<argument>-cp</argument>
<classpath/>
<argument>com.github.masato.streams.flink.WordCount</argument>
</arguments>
</configuration>
</plugin>
Führen Sie das [Ausführungsziel] aus (http://www.mojohaus.org/exec-maven-plugin/examples/example-exec-for-java-programs.html). WordCount Das Beispiel zählt die Wörter im Text und gibt sie standardmäßig aus.
$ mvn clean package exec:exec@App
...
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)
Löschen Sie den gesamten Java-Code, der vom Archetyp flink-quickstart-java erstellt wurde, und schreiben Sie dann neuen Code. Ich werde.
$ rm src/main/java/com/github/masato/streams/flink/*.java
Der Quellcode befindet sich auch im Repository (https://github.com/masato/streams-flink-java-examples). Das in Scala geschriebene Beispiel war eine Datei, aber im Fall von Java sind die Klassen zum besseren Verständnis getrennt.
$ tree streams-flink-java-examples
streams-flink-java-examples
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── github
│ └── masato
│ └── streams
│ └── flink
│ ├── Accumulator.java
│ ├── Aggregate.java
│ ├── App.java
│ ├── Average.java
│ └── Sensor.java
└── resources
└── log4j.properties
App.java
Der vollständige Text des Programms, das die Hauptmethode implementiert. Beispiel [App.scala] in Scala geschrieben (https://github.com/masato/streams-flink-scala-examples/blob/master/src/main/scala/com/github/masato/streams/flink/ Ähnlich wie App.scala), aber AggregateFunction /AggregateFunction.html) und [WindowFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/windowing/ WindowFunction.html) wird zu einer Klasse gemacht.
App.java
package com.github.masato.streams.flink;
import java.util.Date;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class App {
private static DateTimeFormatter fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
private static final String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS_CONFIG");
private static final String groupId = System.getenv("GROUP_ID");
private static final String sourceTopic = System.getenv("SOURCE_TOPIC");
private static final String sinkTopic = System.getenv("SINK_TOPIC");
public static void main(String[] args) throws Exception {
final Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStream<ObjectNode> events =
env.addSource(new FlinkKafkaConsumer010<>(
sourceTopic,
new JSONDeserializationSchema(),
props)).name("events");
final SingleOutputStreamOperator<ObjectNode> timestamped =
events
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(10)) {
@Override
public long extractTimestamp(ObjectNode element) {
return element.get("time").asLong() * 1000;
}
});
timestamped
.map((v) -> {
String key = v.get("bid").asText();
double ambient = v.get("ambient").asDouble();
return new Sensor(key, ambient);
})
.keyBy(v -> v.key)
.timeWindow(Time.seconds(60))
.aggregate(new Aggregate(), new Average())
.map((v) -> {
ZonedDateTime zdt =
new Date(v.time).toInstant().atZone(ZoneId.systemDefault());
String time = fmt.format(zdt);
Map<String, Object> payload = new HashMap<String, Object>();
payload.put("time", time);
payload.put("bid", v.bid);
payload.put("ambient", v.sum);
String retval = new ObjectMapper().writeValueAsString(payload);
System.out.println(retval);
return retval;
})
.addSink(new FlinkKafkaProducer010<String>(
bootstrapServers,
sinkTopic,
new SimpleStringSchema())
).name("kafka");
env.execute();
}
}
Sensor.java
In Scala haben die Sensordaten des Streams ein Tupel von Scala erstellt, wobei die BD-Adresse als Schlüssel verwendet wurde.
timestamped
.map { v =>
val key = v.get("bid").asText
val ambient = v.get("ambient").asDouble
(key, ambient)
}
Selbst im Fall von Java 8 mag Tuple 2 Scala /Tuple2.html) kann verwendet werden. Es muss jedoch mit Eclipse JDT kompiliert werden, wie unter Verwenden von Apache Flink mit Java 8 beschrieben. .. Oder [return ()](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns -org.apache.flink.api.common.typeinfo.TypeInformation-) bis [TupleTypeInfo](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache Wenn Sie den Elementtyp-Hinweis in der Java-Klasse in /flink/api/java/typeutils/TupleTypeInfo.html) nicht angeben, tritt ein Fehler auf.
.map((v) -> {
double ambient = v.get("value").get("ambient").asDouble();
String key = v.get("sensor").get("bid").asText();
return new Tuple2<>(key, ambient);
})
.returns(new TupleTypeInfo<>(TypeInformation.of(String.class),
TypeInformation.of(Double.class)))
Es ist ein wenig mühsam, daher ist es einfacher, gewöhnliches POJO zu verwenden.
Sensor.java
package com.github.masato.streams.flink;
public class Sensor {
public String key;
public double ambient;
public Sensor(String key, double ambient) {
this.key = key;
this.ambient = ambient;
}
}
Aggregate.java
Implementierung der Schnittstelle AggregateFunction Ich werde. Im Gegensatz zu Scala ist Accumulator keine Fallklasse, aber ansonsten ist es fast dieselbe.
Aggregate.java
package com.github.masato.streams.flink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.AggregateFunction;
public class Aggregate implements AggregateFunction<Sensor, Accumulator, Accumulator> {
private static final long serialVersionUID = 3355966737412029618L;
@Override
public Accumulator createAccumulator() {
return new Accumulator(0L, "", 0.0, 0);
}
@Override
public Accumulator merge(Accumulator a, Accumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
@Override
public void add(Sensor value, Accumulator acc) {
acc.sum += value.ambient;
acc.count++;
}
@Override
public Accumulator getResult(Accumulator acc) {
return acc;
}
}
Average.java
Implementierung von WindowFunction ist.
Average.java
package com.github.masato.streams.flink;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.util.Collector;
public class Average implements WindowFunction<Accumulator,
Accumulator, String, TimeWindow> {
private static final long serialVersionUID = 5532466889638450746L;
@Override
public void apply(String key,
TimeWindow window,
Iterable<Accumulator> input,
Collector<Accumulator> out) {
Accumulator in = input.iterator().next();
out.collect(new Accumulator(window.getEnd(), key, in.sum/in.count, in.count));
}
}
Im Fall von Scala habe ich die apply ()
Implementierung von WindowFunction direkt in aggregat
geschrieben.
.aggregate(new Aggregate(),
( key: String,
window: TimeWindow,
input: Iterable[Accumulator],
out: Collector[Accumulator] ) => {
var in = input.iterator.next()
out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
}
)
pom.xml
Kafka wird als Quelle des Streams verwendet. Legen Sie die Verbindungsinformationen in der Umgebungsvariablen von [exec-maven-plugin] fest (http://www.mojohaus.org/exec-maven-plugin/). Informationen zur Vorbereitung von SensorTag und Raspberry Pi 3 sowie zum Erstellen eines Kafka-Clusters finden Sie unter hier.
pom.xml
<build>
<plugins>
...
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.5.0</version>
<executions>
<execution>
<id>App</id>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<arguments>
<argument>-cp</argument>
<classpath/>
<argument>com.github.masato.streams.flink.App</argument>
</arguments>
<environmentVariables>
<APPLICATION_ID_CONFIG>sensortag</APPLICATION_ID_CONFIG>
<BOOTSTRAP_SERVERS_CONFIG>confluent:9092</BOOTSTRAP_SERVERS_CONFIG>
<SOURCE_TOPIC>sensortag</SOURCE_TOPIC>
<SINK_TOPIC>sensortag-sink</SINK_TOPIC>
<GROUP_ID>flinkGroup</GROUP_ID>
</environmentVariables>
</configuration>
</plugin>
Führen Sie das Exec-Ziel des Exec-Maven-Plugins aus, nachdem Sie SensorTag-Daten von Raspberry Pi 3 an Kafka gesendet haben.
$ mvn clean install exec:exec@App
Der Durchschnittswert der Umgebungstemperatur, die in einem 60-Sekunden-Taumelfenster aggregiert wird, wird standardmäßig ausgegeben.
{"ambient":28.395833333333332,"time":"2017-08-28T11:57:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.44375,"time":"2017-08-28T11:58:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.46875,"time":"2017-08-28T11:59:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.5,"time":"2017-08-28T12:00:00+09:00","bid":"B0:B4:48:BD:DA:03"}
Recommended Posts