[JAVA] Fensteraggregation von SensorTag mit Kafka Streams

[Ich habe versucht, Fenster zu aggregieren] mit Jupyter als Betriebsumgebung für SensorTag-Daten in PySpark Streaming (http://qiita.com/masato/items/574b38e45014a6ae7d88). Es gibt mehrere andere Stream-Verarbeitungs-Frameworks, aber als Nächstes werde ich versuchen, Kafka-Streams zu verwenden (http://docs.confluent.io/current/streams/index.html). Im Gegensatz zu Spark ist dies eine Bibliothek, kein Cluster. Derzeit unterstützt die Entwicklungssprache offiziell nur Java.

Java-Umgebung

Ich werde den Code von Eclim schreiben, der auf Ubuntu 16.04 mit Maven basiert.

Projekt

Erstellen Sie die folgenden Dateien im Projektverzeichnis. Den vollständigen Code finden Sie hier im Repository (https://github.com/masato/streams-kafka-examples).

$  tree
.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── github
        │           └── masato
        │               └── streams
        │                   └── kafka
        │                       ├── App.java
        │                       ├── SensorSumDeserializer.java
        │                       ├── SensorSum.java
        │                       └── SensorSumSerializer.java
        └── resources
            └── logback.xml

9 directories, 7 files

App.java

Ich werde den Code in mehreren Teilen erklären.

Konstante

Der Themenname usw. wird aus den in pom.xml definierten Umgebungsvariablen abgerufen. WINDOWS_MINUTES ist das Intervall für die Fensteraggregation. COMMIT_MINUTES ist das Intervall, in dem Kafka den Cache automatisch festschreibt, wie unten beschrieben. Hier in Minuten angeben.

App.java


public class App {

    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    private static final String SOURCE_TOPIC = System.getenv("SOURCE_TOPIC");
    private static final String SINK_TOPIC = System.getenv("SINK_TOPIC");
    private static final long WINDOWS_MINUTES = 2L;
    private static final long COMMIT_MINUTES = 3L;

Serialisierung

Erstellen Sie einen Serializer und Deserializer für den Datensatz. Die Kafka Streams-App speichert die Zwischenergebnisse des Prozesses in einem Thema und implementiert den Ablauf. SerDe wird definiert, indem der Deserializer zum Lesen von Datensätzen aus einem Thema und der Serializer zum Schreiben von Datensätzen kombiniert werden. SerDe ist für jeden Themenschlüssel und Werttyp erforderlich.

App.java


    public static void main(String[] args) throws Exception {

        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde =
            Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        final Serializer<SensorSum> sensorSumSerializer =
            new SensorSumSerializer();
        final Deserializer<SensorSum> sensorSumDeserializer =
            new SensorSumDeserializer();
        final Serde<SensorSum> sensorSumSerde =
            Serdes.serdeFrom(sensorSumSerializer,
                             sensorSumDeserializer);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Double> doubleSerde = Serdes.Double();

Erstellen eines KStreams

Rufen Sie zuerst "stream ()" von KStreamBuilder auf und rufen Sie [KStream](https: // kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html). Der Schlüssel des Themas ist eine Zeichenfolge, und der Wert ist SerDe von JsonNode.

App.java


        final KStreamBuilder builder = new KStreamBuilder();

        LOG.info("Starting Sorting Job");

        final KStream<String, JsonNode> source =
            builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);

Erstellen Sie KGroupedStream

Die SensorTag-Nachricht kommt von Raspberry Pi 3 als JSON-Zeichenfolge zum Kafka-Thema.

{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875}

Ein KStream-Datensatz ist ein [KeyValue] -Objekt (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/KeyValue.html) mit einem Schlüssel und einem Wert. Um im Beispiel nur den Durchschnittswert der Umgebungstemperatur (Umgebungstemperatur) im Fenster zu aggregieren, rufen Sie "map ()" auf und erstellen Sie einen neuen KStream mit nur dem Paar aus Schlüssel und Umgebungstemperatur.

Rufen Sie dann groupByKey () und group by key auf, um [KGroupedStream] zu erstellen (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html). .. Da der Schlüssel des Datensatzes eine Zeichenfolge ist und der Wert "doppelt" der Umgebungstemperatur ist, geben Sie jede SerDe an.

App.java


        final KGroupedStream<String, Double> sensors =
            source
            .map((k, v) -> {
                    double ambient = v.get("ambient").asDouble();
                    return KeyValue.pair(k, ambient);
                })
            .groupByKey(stringSerde, doubleSerde);

Erstellen Sie eine KTable aus KStram

Rufen Sie aggregat () von KGroupedStream auf, um [KTable] zu erstellen (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KTable.html). KTable speichert den Gesamtwert der Datensätze und den Status der Anzahl der Datensätze in dem für jeden Schlüssel angegebenen Fensterintervall.

Im ersten Argument von aggregat (), Initializer, dem für die Stream-Aggregation verwendeten Aggregator Initialisieren. Hier wird die "SensorSum" initialisiert, die den Status der Fensteraggregation enthält. Implementieren Sie den Aggregator mit dem zweiten Argument. Der Schlüssel und der Wert des aktuellen Datensatzes und die im vorherigen Datensatzprozess erstellte "Sensorsumme" werden übergeben. Gibt eine neue "Sensorsumme" zurück, indem der Gesamtwert und die Anzahl der Datensätze jedes Mal addiert werden, wenn Daten eintreffen. Das dritte Argument definiert das 2-Minuten-Fenster TimeWindows. .. Das vierte Argument ist SerDe von "SensorSum", und das fünfte Argument ist der Themenname, der den Status enthält.

App.java


        final KTable<Windowed<String>, SensorSum> sensorAgg =
            sensors
            .aggregate(() -> new SensorSum(0D, 0)
                       , (aggKey, value, agg) -> new SensorSum(agg.sum + value, agg.count + 1)
                       , TimeWindows.of(TimeUnit.MINUTES.toMillis(WINDOWS_MINUTES))
                       , sensorSumSerde,
                       "sensorSum");

Erstellen Sie Kstram aus KTable

Berechnen Sie den Durchschnittswert mit mapValues () von KTable. Der Durchschnitt der Summe geteilt durch die Anzahl der Datensätze ist die neue KTable für den "Double" -Datensatz. Rufen Sie von hier aus toStream () auf, um einen KStream zu erstellen. Der Datensatz wird in eine JSON-Zeichenfolge aus Zeitstempeln, Schlüsseln und Durchschnittswerten formatiert und an den Stream ausgegeben. Der Zeitstempel ist auf ISO 8601 eingestellt, damit Daten problemlos zwischen verschiedenen Systemen ausgetauscht werden können. Speichern Sie den Datensatz unter dem zuletzt angegebenen Thema und beenden Sie den Vorgang.

App.java


        final DateTimeFormatter fmt =
            DateTimeFormatter.ISO_OFFSET_DATE_TIME;

        sensorAgg
            .<Double>mapValues((v) -> ((double) v.sum / v.count))
            .toStream()
            .map((key, avg) -> {
                    long end = key.window().end();
                    ZonedDateTime zdt =
                        new Date(end).toInstant()
                        .atZone(ZoneId.systemDefault());
                    String time = fmt.format(zdt);
                    String bid = key.key();
                    String retval =
                        String.format("{\"time\": \"%s\", \"bid\": \"%s\", \"ambient\": %f}",
                                      time, bid, avg);
                    LOG.info(retval);
                    return new KeyValue<String,String>(bid, retval);
             })
            .to(SINK_TOPIC);

Start von Kafka Streams

Erstellen Sie aus den Konfigurationsobjekten und dem Builder einen Kafka Streams (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/KafkaStreams.html), um die Kafka Streams-App zu starten. Registrieren Sie es auch im Shutdown-Hook, um Kafka Stream mit SIGTERM zu stoppen.

App.java


        final StreamsConfig config = new StreamsConfig(getProperties());
        final KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Informationen zu den Einstellungen und zum Timeout von Kafka Streams

Erstellen Sie Eigenschaften, die in den Kafka Streams-Einstellungen verwendet werden sollen, aus Umgebungsvariablen.

App.java


    private static Properties getProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,
                  System.getenv("APPLICATION_ID_CONFIG"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
                  System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                  WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
                  TimeUnit.MINUTES.toMillis(COMMIT_MINUTES));

        return props;
    }

COMMIT_INTERVAL_MS_CONFIG

Anfangs habe ich [StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] nicht geändert (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG). Das Protokoll wird von map () von KStream ausgegeben, bevor der Datensatz als Thema gespeichert wird. Ich wollte das aggregierte Ergebnis des 2-Minuten-Fensterintervalls am Ende nur einmal ausgeben, aber das Ergebnis wurde 4-5 Mal nicht angegeben.

{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.385417}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.410156}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.440341}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.450521}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}

In Bezug auf die folgenden Artikel scheint dies das erwartete Verhalten aufgrund der Änderungsprotokoll-Stream-Funktion der KTable zu sein. Es gibt kein Endergebnis der Fensteraggregation in KTable, und die im Cache aktualisierten Werte werden in regelmäßigen Abständen festgeschrieben. Es scheint, dass Sie Ihren eigenen Code implementieren müssen, um doppelte Datensätze mit transform () undprocess ()nachtoStream ()to KStream zu entfernen.

Sie können doppelte Datensätze nicht vollständig entfernen, aber Sie können die Anzahl der Cache-Commits reduzieren, indem Sie den Wert von "StreamsConfig.COMMIT_INTERVAL_MS_CONFIG" erhöhen. Für [Standard] sind 30 Sekunden angegeben (http://docs.confluent.io/3.2.1/streams/developer-guide.html#optional-configuration-parameters).

Andere Klassen

Bereiten Sie die Klassen model (SensorSum.java), serializer (SensorSumSerializer.java) und deserializer (SensorSumDeserializer) vor. Der Serializer implementiert "serialize ()", um die Eigenschaften von "SensorSum" in ein Byte-Array zu konvertieren. Ordnen Sie 8 Bytes "Double" des gesamten Umgebungstemperaturwerts und 4 Bytes "Integer" der Anzahl der im Bytepuffer zu verwendenden Datensätze zu.

SensorSumSerializer.java


    public byte[] serialize(String topic, SensorSum data) {
        ByteBuffer buffer = ByteBuffer.allocate(8 + 4);
        buffer.putDouble(data.sum);
        buffer.putInt(data.count);

        return buffer.array();
    }

Lauf

Führen Sie Kafka Streams über das Exec Maven Plugin (http://www.mojohaus.org/exec-maven-plugin/) aus.

$ mvn clean install exec:exec@json

Ich habe das Fensterintervall auf 2 Minuten und das Cache-Festschreibungsintervall auf 3 Minuten eingestellt. Immerhin gibt es mehrmals doppelte Ausgaben, aber ich konnte die doppelten Ausgaben reduzieren.

{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773}
{"time": "2017-08-08T11:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414063}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.453125}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.476563}
{"time": "2017-08-08T11:38:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.546875}

Recommended Posts

Fensteraggregation von SensorTag mit Kafka Streams
Fensteraggregation von Sensordaten mit Apache Flink und Java 8