[JAVA] Agrégation de fenêtres de SensorTag avec Kafka Streams

[J'ai essayé l'agrégation de fenêtres] en utilisant Jupyter comme environnement d'exploitation pour les données SensorTag dans PySpark Streaming (http://qiita.com/masato/items/574b38e45014a6ae7d88). Il existe plusieurs autres frameworks de traitement de flux, mais j'essaierai ensuite d'utiliser Kafka Streams. Contrairement à Spark, il s'agit d'une bibliothèque, pas d'un cluster. Actuellement, le langage de développement ne prend officiellement en charge que Java.

Environnement Java

J'écrirai le code de Eclim construit sur Ubuntu 16.04 avec Maven.

projet

Créez les fichiers suivants dans le répertoire du projet. Le code complet peut être trouvé ici dans le référentiel (https://github.com/masato/streams-kafka-examples).

$  tree
.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── github
        │           └── masato
        │               └── streams
        │                   └── kafka
        │                       ├── App.java
        │                       ├── SensorSumDeserializer.java
        │                       ├── SensorSum.java
        │                       └── SensorSumSerializer.java
        └── resources
            └── logback.xml

9 directories, 7 files

App.java

Je vais expliquer le code en plusieurs parties.

constant

Le nom du sujet, etc. sont obtenus à partir des variables d'environnement définies dans pom.xml. WINDOWS_MINUTES est l'intervalle d'agrégation de fenêtres. COMMIT_MINUTES est l'intervalle auquel Kafka valide automatiquement le cache, comme décrit ci-dessous. Ici, spécifiez par minutes.

App.java


public class App {

    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    private static final String SOURCE_TOPIC = System.getenv("SOURCE_TOPIC");
    private static final String SINK_TOPIC = System.getenv("SINK_TOPIC");
    private static final long WINDOWS_MINUTES = 2L;
    private static final long COMMIT_MINUTES = 3L;

Sérialisation

Créez un sérialiseur et un désérialiseur pour l'enregistrement. L'application Kafka Streams enregistre les résultats intermédiaires du processus dans une rubrique et implémente le flux. SerDe est défini en combinant le désérialiseur pour lire les enregistrements d'une rubrique et le sérialiseur pour écrire des enregistrements. SerDe est requis pour chaque clé de rubrique et type de valeur.

App.java


    public static void main(String[] args) throws Exception {

        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde =
            Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        final Serializer<SensorSum> sensorSumSerializer =
            new SensorSumSerializer();
        final Deserializer<SensorSum> sensorSumDeserializer =
            new SensorSumDeserializer();
        final Serde<SensorSum> sensorSumSerde =
            Serdes.serdeFrom(sensorSumSerializer,
                             sensorSumDeserializer);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Double> doubleSerde = Serdes.Double();

Créer un KStream

Tout d'abord, appelez stream () de KStreamBuilder et appelez [KStream](https: // kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html). La clé de la rubrique est une chaîne et la valeur est SerDe de JsonNode.

App.java


        final KStreamBuilder builder = new KStreamBuilder();

        LOG.info("Starting Sorting Job");

        final KStream<String, JsonNode> source =
            builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);

Créer KGroupedStream

Le message SensorTag arrive au sujet Kafka de Raspberry Pi 3 sous forme de chaîne JSON.

{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875}

Un enregistrement KStream est un objet KeyValue avec une clé et une valeur. Dans l'exemple, pour agréger uniquement la valeur moyenne de la température ambiante (ambiante) dans la fenêtre, appelez map () et créez un nouveau KStream avec uniquement la paire clé et température ambiante.

Appelez ensuite groupByKey () et groupez par clé pour créer KGroupedStream .. Étant donné que la clé de l'enregistrement est une chaîne de caractères et que la valeur est «double» de la température ambiante, spécifiez chaque SerDe.

App.java


        final KGroupedStream<String, Double> sensors =
            source
            .map((k, v) -> {
                    double ambient = v.get("ambient").asDouble();
                    return KeyValue.pair(k, ambient);
                })
            .groupByKey(stringSerde, doubleSerde);

Créer KTable à partir de KStram

Appelez ʻaggregate () `de KGroupedStream pour créer KTable. KTable conserve la valeur totale des enregistrements et le nombre d'enregistrements à l'intervalle de fenêtre spécifié pour chaque clé.

Dans le premier argument de ʻaggregate () , [Initializer](https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Initializer.html), l'agrégateur utilisé pour l'agrégation de flux Initialiser. Ici, le SensorSumqui contient l'état d'agrégation de fenêtre est initialisé. Implémentez l'agrégateur avec le deuxième argument. La clé et la valeur de l'enregistrement actuel et le «SensorSum» créés dans le processus d'enregistrement précédent sont transmis. Renvoie un nouveauSensorSumen ajoutant la valeur totale et le nombre d'enregistrements à chaque fois que les données arrivent. Le troisième argument définit la fenêtre de 2 minutes [TimeWindows](http://apache.mesi.com.ar/kafka/0.10.2.0/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html) .. Le 4ème argument est SerDe deSensorSum`, et le 5ème argument est le nom de rubrique qui contient l'état.

App.java


        final KTable<Windowed<String>, SensorSum> sensorAgg =
            sensors
            .aggregate(() -> new SensorSum(0D, 0)
                       , (aggKey, value, agg) -> new SensorSum(agg.sum + value, agg.count + 1)
                       , TimeWindows.of(TimeUnit.MINUTES.toMillis(WINDOWS_MINUTES))
                       , sensorSumSerde,
                       "sensorSum");

Créer Kstram à partir de KTable

Calculez la valeur moyenne avec mapValues () de KTable. La valeur moyenne de la valeur totale divisée par le nombre d'enregistrements est le nouveau KTable pour l'enregistrement «Double». De là, appelez toStream () pour créer un KStream. L'enregistrement est formaté dans une chaîne JSON d'horodatages, de clés et de moyennes et est généré dans le flux. L'horodatage est défini sur ISO 8601 afin que les données puissent être facilement échangées entre différents systèmes. Enregistrez l'enregistrement dans le dernier sujet spécifié et terminez.

App.java


        final DateTimeFormatter fmt =
            DateTimeFormatter.ISO_OFFSET_DATE_TIME;

        sensorAgg
            .<Double>mapValues((v) -> ((double) v.sum / v.count))
            .toStream()
            .map((key, avg) -> {
                    long end = key.window().end();
                    ZonedDateTime zdt =
                        new Date(end).toInstant()
                        .atZone(ZoneId.systemDefault());
                    String time = fmt.format(zdt);
                    String bid = key.key();
                    String retval =
                        String.format("{\"time\": \"%s\", \"bid\": \"%s\", \"ambient\": %f}",
                                      time, bid, avg);
                    LOG.info(retval);
                    return new KeyValue<String,String>(bid, retval);
             })
            .to(SINK_TOPIC);

Début des flux Kafka

Créez un Kafka Streams (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/KafkaStreams.html) à partir des objets de configuration et du générateur pour démarrer l'application Kafka Streams. Enregistrez-le également dans le hook d'arrêt pour arrêter Kafka Stream avec SIGTERM.

App.java


        final StreamsConfig config = new StreamsConfig(getProperties());
        final KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

À propos des paramètres et du délai d'expiration de Kafka Streams

Créez des Propriétés à utiliser dans les paramètres de Kafka Streams à partir de variables d'environnement.

App.java


    private static Properties getProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,
                  System.getenv("APPLICATION_ID_CONFIG"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
                  System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                  WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
                  TimeUnit.MINUTES.toMillis(COMMIT_MINUTES));

        return props;
    }

COMMIT_INTERVAL_MS_CONFIG

Au départ, je n'ai pas changé StreamsConfig.COMMIT_INTERVAL_MS_CONFIG. Le journal est généré par map () de KStream avant de sauvegarder l'enregistrement en tant que sujet. Je voulais sortir le résultat agrégé de l'intervalle de fenêtre de 2 minutes une seule fois à la fin, mais le résultat n'était pas spécifié 4 à 5 fois.

{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.385417}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.410156}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.440341}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.450521}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}

En référence aux articles suivants, cela semble être le comportement attendu en raison de la fonctionnalité de flux de journal des modifications de KTable. Il n'y a pas de résultat final de l'agrégation de fenêtres dans KTable et les valeurs mises à jour dans le cache sont validées à intervalles réguliers. Il semble que vous deviez implémenter votre propre code pour supprimer les enregistrements en double en utilisant transform () et process () après toStream () dans KStream.

Vous ne pouvez pas éliminer complètement les enregistrements en double, mais vous pouvez réduire le nombre de validations de cache en augmentant la valeur de StreamsConfig.COMMIT_INTERVAL_MS_CONFIG. 30 secondes sont spécifiées pour Par défaut.

Autres classes

Préparez les classes de modèle (SensorSum.java), de sérialiseur (SensorSumSerializer.java) et de désérialiseur (SensorSumDeserializer). Le sérialiseur implémente serialize () pour convertir les propriétés de SensorSum en un tableau d'octets. Allouez 8 octets de «Double» de la valeur totale de la température ambiante et 4 octets de «Nombre» du nombre d'enregistrements à utiliser dans le tampon d'octets.

SensorSumSerializer.java


    public byte[] serialize(String topic, SensorSum data) {
        ByteBuffer buffer = ByteBuffer.allocate(8 + 4);
        buffer.putDouble(data.sum);
        buffer.putInt(data.count);

        return buffer.array();
    }

Courir

Exécutez Kafka Streams à partir du plugin Exec Maven (http://www.mojohaus.org/exec-maven-plugin/).

$ mvn clean install exec:exec@json

J'ai défini l'intervalle de fenêtre à 2 minutes et l'intervalle de validation du cache à 3 minutes. Après tout, il y a plusieurs fois des sorties dupliquées, mais j'ai pu réduire les sorties dupliquées.

{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773}
{"time": "2017-08-08T11:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414063}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.453125}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.476563}
{"time": "2017-08-08T11:38:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.546875}

Recommended Posts

Agrégation de fenêtres de SensorTag avec Kafka Streams
Agrégation de fenêtres de données de capteurs avec Apache Flink et Java 8