[J'ai essayé l'agrégation de fenêtres] en utilisant Jupyter comme environnement d'exploitation pour les données SensorTag dans PySpark Streaming (http://qiita.com/masato/items/574b38e45014a6ae7d88). Il existe plusieurs autres frameworks de traitement de flux, mais j'essaierai ensuite d'utiliser Kafka Streams. Contrairement à Spark, il s'agit d'une bibliothèque, pas d'un cluster. Actuellement, le langage de développement ne prend officiellement en charge que Java.
J'écrirai le code de Eclim construit sur Ubuntu 16.04 avec Maven.
Créez les fichiers suivants dans le répertoire du projet. Le code complet peut être trouvé ici dans le référentiel (https://github.com/masato/streams-kafka-examples).
$ tree
.
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── github
│ └── masato
│ └── streams
│ └── kafka
│ ├── App.java
│ ├── SensorSumDeserializer.java
│ ├── SensorSum.java
│ └── SensorSumSerializer.java
└── resources
└── logback.xml
9 directories, 7 files
App.java
Je vais expliquer le code en plusieurs parties.
Le nom du sujet, etc. sont obtenus à partir des variables d'environnement définies dans pom.xml. WINDOWS_MINUTES
est l'intervalle d'agrégation de fenêtres. COMMIT_MINUTES
est l'intervalle auquel Kafka valide automatiquement le cache, comme décrit ci-dessous. Ici, spécifiez par minutes.
App.java
public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);
private static final String SOURCE_TOPIC = System.getenv("SOURCE_TOPIC");
private static final String SINK_TOPIC = System.getenv("SINK_TOPIC");
private static final long WINDOWS_MINUTES = 2L;
private static final long COMMIT_MINUTES = 3L;
Créez un sérialiseur et un désérialiseur pour l'enregistrement. L'application Kafka Streams enregistre les résultats intermédiaires du processus dans une rubrique et implémente le flux. SerDe est défini en combinant le désérialiseur pour lire les enregistrements d'une rubrique et le sérialiseur pour écrire des enregistrements. SerDe est requis pour chaque clé de rubrique et type de valeur.
jsonSerde Dans l'enregistrement SensorTag, la clé est une chaîne de caractères et la valeur est JsonNode de Jackson. com / plus rapidexml / jackson / databind / JsonNode.html).
sensorSumSerde
SenroSum
est une classe qui contient l'état de la température ambiante et de l'agrégation de fenêtres personnalisées.
stringSerde SerDe pour la chaîne par défaut. Cette fois, les clés de message sont toutes «String».
doubleSerde SerDe pour le double par défaut. La température ambiante (ambiante) du SensorTag est agrégée par fenêtre avec «double».
App.java
public static void main(String[] args) throws Exception {
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde =
Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serializer<SensorSum> sensorSumSerializer =
new SensorSumSerializer();
final Deserializer<SensorSum> sensorSumDeserializer =
new SensorSumDeserializer();
final Serde<SensorSum> sensorSumSerde =
Serdes.serdeFrom(sensorSumSerializer,
sensorSumDeserializer);
final Serde<String> stringSerde = Serdes.String();
final Serde<Double> doubleSerde = Serdes.Double();
Tout d'abord, appelez stream ()
de KStreamBuilder et appelez [KStream](https: // kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html). La clé de la rubrique est une chaîne et la valeur est SerDe de JsonNode.
App.java
final KStreamBuilder builder = new KStreamBuilder();
LOG.info("Starting Sorting Job");
final KStream<String, JsonNode> source =
builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
Le message SensorTag arrive au sujet Kafka de Raspberry Pi 3 sous forme de chaîne JSON.
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875}
Un enregistrement KStream est un objet KeyValue avec une clé et une valeur. Dans l'exemple, pour agréger uniquement la valeur moyenne de la température ambiante (ambiante) dans la fenêtre, appelez map ()
et créez un nouveau KStream avec uniquement la paire clé et température ambiante.
Appelez ensuite groupByKey ()
et groupez par clé pour créer KGroupedStream .. Étant donné que la clé de l'enregistrement est une chaîne de caractères et que la valeur est «double» de la température ambiante, spécifiez chaque SerDe.
App.java
final KGroupedStream<String, Double> sensors =
source
.map((k, v) -> {
double ambient = v.get("ambient").asDouble();
return KeyValue.pair(k, ambient);
})
.groupByKey(stringSerde, doubleSerde);
Appelez ʻaggregate () `de KGroupedStream pour créer KTable. KTable conserve la valeur totale des enregistrements et le nombre d'enregistrements à l'intervalle de fenêtre spécifié pour chaque clé.
Dans le premier argument de ʻaggregate () , [Initializer](https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Initializer.html), l'agrégateur utilisé pour l'agrégation de flux Initialiser. Ici, le
SensorSumqui contient l'état d'agrégation de fenêtre est initialisé. Implémentez l'agrégateur avec le deuxième argument. La clé et la valeur de l'enregistrement actuel et le «SensorSum» créés dans le processus d'enregistrement précédent sont transmis. Renvoie un nouveau
SensorSumen ajoutant la valeur totale et le nombre d'enregistrements à chaque fois que les données arrivent. Le troisième argument définit la fenêtre de 2 minutes [TimeWindows](http://apache.mesi.com.ar/kafka/0.10.2.0/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html) .. Le 4ème argument est SerDe de
SensorSum`, et le 5ème argument est le nom de rubrique qui contient l'état.
App.java
final KTable<Windowed<String>, SensorSum> sensorAgg =
sensors
.aggregate(() -> new SensorSum(0D, 0)
, (aggKey, value, agg) -> new SensorSum(agg.sum + value, agg.count + 1)
, TimeWindows.of(TimeUnit.MINUTES.toMillis(WINDOWS_MINUTES))
, sensorSumSerde,
"sensorSum");
Calculez la valeur moyenne avec mapValues ()
de KTable. La valeur moyenne de la valeur totale divisée par le nombre d'enregistrements est le nouveau KTable pour l'enregistrement «Double». De là, appelez toStream ()
pour créer un KStream. L'enregistrement est formaté dans une chaîne JSON d'horodatages, de clés et de moyennes et est généré dans le flux. L'horodatage est défini sur ISO 8601 afin que les données puissent être facilement échangées entre différents systèmes. Enregistrez l'enregistrement dans le dernier sujet spécifié et terminez.
App.java
final DateTimeFormatter fmt =
DateTimeFormatter.ISO_OFFSET_DATE_TIME;
sensorAgg
.<Double>mapValues((v) -> ((double) v.sum / v.count))
.toStream()
.map((key, avg) -> {
long end = key.window().end();
ZonedDateTime zdt =
new Date(end).toInstant()
.atZone(ZoneId.systemDefault());
String time = fmt.format(zdt);
String bid = key.key();
String retval =
String.format("{\"time\": \"%s\", \"bid\": \"%s\", \"ambient\": %f}",
time, bid, avg);
LOG.info(retval);
return new KeyValue<String,String>(bid, retval);
})
.to(SINK_TOPIC);
Créez un Kafka Streams (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/KafkaStreams.html) à partir des objets de configuration et du générateur pour démarrer l'application Kafka Streams. Enregistrez-le également dans le hook d'arrêt pour arrêter Kafka Stream avec SIGTERM
.
App.java
final StreamsConfig config = new StreamsConfig(getProperties());
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Créez des Propriétés
à utiliser dans les paramètres de Kafka Streams à partir de variables d'environnement.
App.java
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
System.getenv("APPLICATION_ID_CONFIG"));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
TimeUnit.MINUTES.toMillis(COMMIT_MINUTES));
return props;
}
COMMIT_INTERVAL_MS_CONFIG
Au départ, je n'ai pas changé StreamsConfig.COMMIT_INTERVAL_MS_CONFIG. Le journal est généré par map () de KStream avant de sauvegarder l'enregistrement en tant que sujet. Je voulais sortir le résultat agrégé de l'intervalle de fenêtre de 2 minutes une seule fois à la fin, mais le résultat n'était pas spécifié 4 à 5 fois.
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.385417}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.410156}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.440341}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.450521}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
En référence aux articles suivants, cela semble être le comportement attendu en raison de la fonctionnalité de flux de journal des modifications de KTable. Il n'y a pas de résultat final de l'agrégation de fenêtres dans KTable et les valeurs mises à jour dans le cache sont validées à intervalles réguliers. Il semble que vous deviez implémenter votre propre code pour supprimer les enregistrements en double en utilisant transform ()
et process ()
après toStream ()
dans KStream.
Vous ne pouvez pas éliminer complètement les enregistrements en double, mais vous pouvez réduire le nombre de validations de cache en augmentant la valeur de StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
. 30 secondes sont spécifiées pour Par défaut.
Préparez les classes de modèle (SensorSum.java), de sérialiseur (SensorSumSerializer.java) et de désérialiseur (SensorSumDeserializer). Le sérialiseur implémente serialize ()
pour convertir les propriétés de SensorSum
en un tableau d'octets. Allouez 8 octets de «Double» de la valeur totale de la température ambiante et 4 octets de «Nombre» du nombre d'enregistrements à utiliser dans le tampon d'octets.
SensorSumSerializer.java
public byte[] serialize(String topic, SensorSum data) {
ByteBuffer buffer = ByteBuffer.allocate(8 + 4);
buffer.putDouble(data.sum);
buffer.putInt(data.count);
return buffer.array();
}
Exécutez Kafka Streams à partir du plugin Exec Maven (http://www.mojohaus.org/exec-maven-plugin/).
$ mvn clean install exec:exec@json
J'ai défini l'intervalle de fenêtre à 2 minutes et l'intervalle de validation du cache à 3 minutes. Après tout, il y a plusieurs fois des sorties dupliquées, mais j'ai pu réduire les sorties dupliquées.
{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773}
{"time": "2017-08-08T11:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414063}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.453125}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.476563}
{"time": "2017-08-08T11:38:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.546875}