J'ai [essayé] l'agrégation de fenêtres en utilisant Apache Flink et l'API Scala pour les données des capteurs SensorTag (http://qiita.com/masato/items/e88413a48921f35ef4db). API Scala autant que possible [API Java 8](https: // ci. Réécrivez à apache.org/projects/flink/flink-docs-release-1.3/dev/java8.html).
[Flink-quickstart-java](https: /) dans Exemple de projet utilisant l'API Java Utilisez /mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java) pour créer un projet Maven. La version d'Apache Flink est «1.3.2», qui est la même que celle de Scala. Modifiez groupId
et package
en fonction de votre environnement.
$ mkdir -p ~/java_apps && cd ~/java_apps
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.2 \
-DgroupId=streams-flink-java-examples \
-DartifactId=streams-flink-java-examples \
-Dversion=0.1 \
-Dpackage=com.github.masato.streams.flink \
-DinteractiveMode=false
Changez le paramètre [maven-compiler-plugin] du plugin (https://maven.apache.org/plugins/maven-compiler-plugin/) en Java 8 (1.8). Ajoutez également exec-maven-plugin pour permettre à Maven d'exécuter l'application Flink main ()
.
pom.xml
<build>
<plugins>
...
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.5.0</version>
<executions>
<execution>
<id>App</id>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<arguments>
<argument>-cp</argument>
<classpath/>
<argument>com.github.masato.streams.flink.WordCount</argument>
</arguments>
</configuration>
</plugin>
Exécutez l 'objectif d'exécution. WordCount L'exemple de compte les mots dans le texte et les sort en standard.
$ mvn clean package exec:exec@App
...
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)
Supprimez tout le code Java créé par l'archétype de flink-quickstart-java puis écrivez un nouveau code. Je vais.
$ rm src/main/java/com/github/masato/streams/flink/*.java
Le code source peut également être trouvé dans le référentiel (https://github.com/masato/streams-flink-java-examples). L 'exemple écrit en Scala était un fichier, mais dans le cas de Java, les classes sont séparées pour faciliter la compréhension.
$ tree streams-flink-java-examples
streams-flink-java-examples
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── github
│ └── masato
│ └── streams
│ └── flink
│ ├── Accumulator.java
│ ├── Aggregate.java
│ ├── App.java
│ ├── Average.java
│ └── Sensor.java
└── resources
└── log4j.properties
App.java
Le texte complet du programme qui implémente la méthode principale. Exemple écrit en Scala [App.scala](https://github.com/masato/streams-flink-scala-examples/blob/master/src/main/scala/com/github/masato/streams/flink/ Similaire à App.scala) mais AggregateFunction /AggregateFunction.html) et [WindowFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/windowing/ WindowFunction.html) est transformé en une classe.
App.java
package com.github.masato.streams.flink;
import java.util.Date;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class App {
private static DateTimeFormatter fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
private static final String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS_CONFIG");
private static final String groupId = System.getenv("GROUP_ID");
private static final String sourceTopic = System.getenv("SOURCE_TOPIC");
private static final String sinkTopic = System.getenv("SINK_TOPIC");
public static void main(String[] args) throws Exception {
final Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStream<ObjectNode> events =
env.addSource(new FlinkKafkaConsumer010<>(
sourceTopic,
new JSONDeserializationSchema(),
props)).name("events");
final SingleOutputStreamOperator<ObjectNode> timestamped =
events
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(10)) {
@Override
public long extractTimestamp(ObjectNode element) {
return element.get("time").asLong() * 1000;
}
});
timestamped
.map((v) -> {
String key = v.get("bid").asText();
double ambient = v.get("ambient").asDouble();
return new Sensor(key, ambient);
})
.keyBy(v -> v.key)
.timeWindow(Time.seconds(60))
.aggregate(new Aggregate(), new Average())
.map((v) -> {
ZonedDateTime zdt =
new Date(v.time).toInstant().atZone(ZoneId.systemDefault());
String time = fmt.format(zdt);
Map<String, Object> payload = new HashMap<String, Object>();
payload.put("time", time);
payload.put("bid", v.bid);
payload.put("ambient", v.sum);
String retval = new ObjectMapper().writeValueAsString(payload);
System.out.println(retval);
return retval;
})
.addSink(new FlinkKafkaProducer010<String>(
bootstrapServers,
sinkTopic,
new SimpleStringSchema())
).name("kafka");
env.execute();
}
}
Sensor.java
Dans Scala, les données du capteur du flux ont créé un Tuple de Scala en utilisant l'adresse BD comme clé.
timestamped
.map { v =>
val key = v.get("bid").asText
val ambient = v.get("ambient").asDouble
(key, ambient)
}
Même dans le cas de Java 8, Tuple 2 comme Scala /Tuple2.html) peut être utilisé. Cependant, il doit être compilé avec Eclipse JDT comme décrit dans Utilisation d'Apache Flink avec Java 8. .. Ou [Returns ()](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns -org.apache.flink.api.common.typeinfo.TypeInformation-) à [TupleTypeInfo](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache Si vous ne spécifiez pas l'indication de type d'élément dans la classe Java dans /flink/api/java/typeutils/TupleTypeInfo.html), une erreur se produit.
.map((v) -> {
double ambient = v.get("value").get("ambient").asDouble();
String key = v.get("sensor").get("bid").asText();
return new Tuple2<>(key, ambient);
})
.returns(new TupleTypeInfo<>(TypeInformation.of(String.class),
TypeInformation.of(Double.class)))
C'est un peu gênant, il est donc plus facile d'utiliser un POJO ordinaire.
Sensor.java
package com.github.masato.streams.flink;
public class Sensor {
public String key;
public double ambient;
public Sensor(String key, double ambient) {
this.key = key;
this.ambient = ambient;
}
}
Aggregate.java
Implémentation de l'interface AggregateFunction Je vais. Contrairement à Scala, Accumulator n'est pas une classe de cas, mais sinon c'est presque la même chose.
Aggregate.java
package com.github.masato.streams.flink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.AggregateFunction;
public class Aggregate implements AggregateFunction<Sensor, Accumulator, Accumulator> {
private static final long serialVersionUID = 3355966737412029618L;
@Override
public Accumulator createAccumulator() {
return new Accumulator(0L, "", 0.0, 0);
}
@Override
public Accumulator merge(Accumulator a, Accumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
@Override
public void add(Sensor value, Accumulator acc) {
acc.sum += value.ambient;
acc.count++;
}
@Override
public Accumulator getResult(Accumulator acc) {
return acc;
}
}
Average.java
Implémentation de WindowFunction est.
Average.java
package com.github.masato.streams.flink;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.util.Collector;
public class Average implements WindowFunction<Accumulator,
Accumulator, String, TimeWindow> {
private static final long serialVersionUID = 5532466889638450746L;
@Override
public void apply(String key,
TimeWindow window,
Iterable<Accumulator> input,
Collector<Accumulator> out) {
Accumulator in = input.iterator().next();
out.collect(new Accumulator(window.getEnd(), key, in.sum/in.count, in.count));
}
}
Dans le cas de Scala, j'ai écrit l'implémentation ʻapply () de WindowFunction directement dans ʻaggregate
.
.aggregate(new Aggregate(),
( key: String,
window: TimeWindow,
input: Iterable[Accumulator],
out: Collector[Accumulator] ) => {
var in = input.iterator.next()
out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
}
)
pom.xml
Kafka est utilisé comme source du flux. Définissez les informations de connexion dans la variable d'environnement de exec-maven-plugin. Veuillez vous référer à here pour préparer SensorTag et Raspberry Pi 3 et construire le cluster Kafka.
pom.xml
<build>
<plugins>
...
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.5.0</version>
<executions>
<execution>
<id>App</id>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<arguments>
<argument>-cp</argument>
<classpath/>
<argument>com.github.masato.streams.flink.App</argument>
</arguments>
<environmentVariables>
<APPLICATION_ID_CONFIG>sensortag</APPLICATION_ID_CONFIG>
<BOOTSTRAP_SERVERS_CONFIG>confluent:9092</BOOTSTRAP_SERVERS_CONFIG>
<SOURCE_TOPIC>sensortag</SOURCE_TOPIC>
<SINK_TOPIC>sensortag-sink</SINK_TOPIC>
<GROUP_ID>flinkGroup</GROUP_ID>
</environmentVariables>
</configuration>
</plugin>
Exécutez l'objectif d'exécution de exec-maven-plugin après l'envoi des données SensorTag de Raspberry Pi 3 à Kafka.
$ mvn clean install exec:exec@App
La valeur moyenne de la température ambiante agrégée dans une fenêtre de culbutage de 60 secondes est sortie en standard.
{"ambient":28.395833333333332,"time":"2017-08-28T11:57:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.44375,"time":"2017-08-28T11:58:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.46875,"time":"2017-08-28T11:59:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.5,"time":"2017-08-28T12:00:00+09:00","bid":"B0:B4:48:BD:DA:03"}
Recommended Posts