[Ich habe versucht, Fenster zu aggregieren] mit Jupyter als Betriebsumgebung für SensorTag-Daten in PySpark Streaming (http://qiita.com/masato/items/574b38e45014a6ae7d88). Es gibt mehrere andere Stream-Verarbeitungs-Frameworks, aber als Nächstes werde ich versuchen, Kafka-Streams zu verwenden (http://docs.confluent.io/current/streams/index.html). Im Gegensatz zu Spark ist dies eine Bibliothek, kein Cluster. Derzeit unterstützt die Entwicklungssprache offiziell nur Java.
Ich werde den Code von Eclim schreiben, der auf Ubuntu 16.04 mit Maven basiert.
Erstellen Sie die folgenden Dateien im Projektverzeichnis. Den vollständigen Code finden Sie hier im Repository (https://github.com/masato/streams-kafka-examples).
$ tree
.
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── github
│ └── masato
│ └── streams
│ └── kafka
│ ├── App.java
│ ├── SensorSumDeserializer.java
│ ├── SensorSum.java
│ └── SensorSumSerializer.java
└── resources
└── logback.xml
9 directories, 7 files
App.java
Ich werde den Code in mehreren Teilen erklären.
Der Themenname usw. wird aus den in pom.xml definierten Umgebungsvariablen abgerufen. WINDOWS_MINUTES
ist das Intervall für die Fensteraggregation. COMMIT_MINUTES
ist das Intervall, in dem Kafka den Cache automatisch festschreibt, wie unten beschrieben. Hier in Minuten angeben.
App.java
public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);
private static final String SOURCE_TOPIC = System.getenv("SOURCE_TOPIC");
private static final String SINK_TOPIC = System.getenv("SINK_TOPIC");
private static final long WINDOWS_MINUTES = 2L;
private static final long COMMIT_MINUTES = 3L;
Erstellen Sie einen Serializer und Deserializer für den Datensatz. Die Kafka Streams-App speichert die Zwischenergebnisse des Prozesses in einem Thema und implementiert den Ablauf. SerDe wird definiert, indem der Deserializer zum Lesen von Datensätzen aus einem Thema und der Serializer zum Schreiben von Datensätzen kombiniert werden. SerDe ist für jeden Themenschlüssel und Werttyp erforderlich.
jsonSerde Im SensorTag-Datensatz ist der Schlüssel eine Zeichenfolge und der Wert ist JsonNode von Jackson. com / schnellerxml / jackson / database / JsonNode.html) Objekt.
sensorSumSerde
SenroSum
ist eine Klasse, die den Status der benutzerdefinierten Umgebungstemperatur und Fensteraggregation enthält.
stringSerde SerDe für die Standardzeichenfolge. Diesmal sind die Nachrichtenschlüssel alle "String".
doubleSerde SerDe für das Standard-Double. Die Umgebungstemperatur (Umgebungstemperatur) des SensorTags wird mit "double" fensteraggregiert.
App.java
public static void main(String[] args) throws Exception {
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde =
Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serializer<SensorSum> sensorSumSerializer =
new SensorSumSerializer();
final Deserializer<SensorSum> sensorSumDeserializer =
new SensorSumDeserializer();
final Serde<SensorSum> sensorSumSerde =
Serdes.serdeFrom(sensorSumSerializer,
sensorSumDeserializer);
final Serde<String> stringSerde = Serdes.String();
final Serde<Double> doubleSerde = Serdes.Double();
Rufen Sie zuerst "stream ()" von KStreamBuilder auf und rufen Sie [KStream](https: // kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html). Der Schlüssel des Themas ist eine Zeichenfolge, und der Wert ist SerDe von JsonNode.
App.java
final KStreamBuilder builder = new KStreamBuilder();
LOG.info("Starting Sorting Job");
final KStream<String, JsonNode> source =
builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
Die SensorTag-Nachricht kommt von Raspberry Pi 3 als JSON-Zeichenfolge zum Kafka-Thema.
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875}
Ein KStream-Datensatz ist ein [KeyValue] -Objekt (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/KeyValue.html) mit einem Schlüssel und einem Wert. Um im Beispiel nur den Durchschnittswert der Umgebungstemperatur (Umgebungstemperatur) im Fenster zu aggregieren, rufen Sie "map ()" auf und erstellen Sie einen neuen KStream mit nur dem Paar aus Schlüssel und Umgebungstemperatur.
Rufen Sie dann groupByKey ()
und group by key auf, um [KGroupedStream] zu erstellen (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html). .. Da der Schlüssel des Datensatzes eine Zeichenfolge ist und der Wert "doppelt" der Umgebungstemperatur ist, geben Sie jede SerDe an.
App.java
final KGroupedStream<String, Double> sensors =
source
.map((k, v) -> {
double ambient = v.get("ambient").asDouble();
return KeyValue.pair(k, ambient);
})
.groupByKey(stringSerde, doubleSerde);
Rufen Sie aggregat ()
von KGroupedStream auf, um [KTable] zu erstellen (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KTable.html). KTable speichert den Gesamtwert der Datensätze und den Status der Anzahl der Datensätze in dem für jeden Schlüssel angegebenen Fensterintervall.
Im ersten Argument von aggregat ()
, Initializer, dem für die Stream-Aggregation verwendeten Aggregator Initialisieren. Hier wird die "SensorSum" initialisiert, die den Status der Fensteraggregation enthält. Implementieren Sie den Aggregator mit dem zweiten Argument. Der Schlüssel und der Wert des aktuellen Datensatzes und die im vorherigen Datensatzprozess erstellte "Sensorsumme" werden übergeben. Gibt eine neue "Sensorsumme" zurück, indem der Gesamtwert und die Anzahl der Datensätze jedes Mal addiert werden, wenn Daten eintreffen. Das dritte Argument definiert das 2-Minuten-Fenster TimeWindows. .. Das vierte Argument ist SerDe von "SensorSum", und das fünfte Argument ist der Themenname, der den Status enthält.
App.java
final KTable<Windowed<String>, SensorSum> sensorAgg =
sensors
.aggregate(() -> new SensorSum(0D, 0)
, (aggKey, value, agg) -> new SensorSum(agg.sum + value, agg.count + 1)
, TimeWindows.of(TimeUnit.MINUTES.toMillis(WINDOWS_MINUTES))
, sensorSumSerde,
"sensorSum");
Berechnen Sie den Durchschnittswert mit mapValues ()
von KTable. Der Durchschnitt der Summe geteilt durch die Anzahl der Datensätze ist die neue KTable für den "Double" -Datensatz. Rufen Sie von hier aus toStream ()
auf, um einen KStream zu erstellen. Der Datensatz wird in eine JSON-Zeichenfolge aus Zeitstempeln, Schlüsseln und Durchschnittswerten formatiert und an den Stream ausgegeben. Der Zeitstempel ist auf ISO 8601 eingestellt, damit Daten problemlos zwischen verschiedenen Systemen ausgetauscht werden können. Speichern Sie den Datensatz unter dem zuletzt angegebenen Thema und beenden Sie den Vorgang.
App.java
final DateTimeFormatter fmt =
DateTimeFormatter.ISO_OFFSET_DATE_TIME;
sensorAgg
.<Double>mapValues((v) -> ((double) v.sum / v.count))
.toStream()
.map((key, avg) -> {
long end = key.window().end();
ZonedDateTime zdt =
new Date(end).toInstant()
.atZone(ZoneId.systemDefault());
String time = fmt.format(zdt);
String bid = key.key();
String retval =
String.format("{\"time\": \"%s\", \"bid\": \"%s\", \"ambient\": %f}",
time, bid, avg);
LOG.info(retval);
return new KeyValue<String,String>(bid, retval);
})
.to(SINK_TOPIC);
Erstellen Sie aus den Konfigurationsobjekten und dem Builder einen Kafka Streams (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/KafkaStreams.html), um die Kafka Streams-App zu starten. Registrieren Sie es auch im Shutdown-Hook, um Kafka Stream mit SIGTERM
zu stoppen.
App.java
final StreamsConfig config = new StreamsConfig(getProperties());
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Erstellen Sie Eigenschaften, die in den Kafka Streams-Einstellungen verwendet werden sollen, aus Umgebungsvariablen.
App.java
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
System.getenv("APPLICATION_ID_CONFIG"));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
TimeUnit.MINUTES.toMillis(COMMIT_MINUTES));
return props;
}
COMMIT_INTERVAL_MS_CONFIG
Anfangs habe ich [StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] nicht geändert (https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG). Das Protokoll wird von map () von KStream ausgegeben, bevor der Datensatz als Thema gespeichert wird. Ich wollte das aggregierte Ergebnis des 2-Minuten-Fensterintervalls am Ende nur einmal ausgeben, aber das Ergebnis wurde 4-5 Mal nicht angegeben.
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.385417}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.410156}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.440341}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.450521}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
In Bezug auf die folgenden Artikel scheint dies das erwartete Verhalten aufgrund der Änderungsprotokoll-Stream-Funktion der KTable zu sein. Es gibt kein Endergebnis der Fensteraggregation in KTable, und die im Cache aktualisierten Werte werden in regelmäßigen Abständen festgeschrieben. Es scheint, dass Sie Ihren eigenen Code implementieren müssen, um doppelte Datensätze mit transform ()
undprocess ()
nachtoStream ()
to KStream zu entfernen.
Sie können doppelte Datensätze nicht vollständig entfernen, aber Sie können die Anzahl der Cache-Commits reduzieren, indem Sie den Wert von "StreamsConfig.COMMIT_INTERVAL_MS_CONFIG" erhöhen. Für [Standard] sind 30 Sekunden angegeben (http://docs.confluent.io/3.2.1/streams/developer-guide.html#optional-configuration-parameters).
Bereiten Sie die Klassen model (SensorSum.java), serializer (SensorSumSerializer.java) und deserializer (SensorSumDeserializer) vor. Der Serializer implementiert "serialize ()", um die Eigenschaften von "SensorSum" in ein Byte-Array zu konvertieren. Ordnen Sie 8 Bytes "Double" des gesamten Umgebungstemperaturwerts und 4 Bytes "Integer" der Anzahl der im Bytepuffer zu verwendenden Datensätze zu.
SensorSumSerializer.java
public byte[] serialize(String topic, SensorSum data) {
ByteBuffer buffer = ByteBuffer.allocate(8 + 4);
buffer.putDouble(data.sum);
buffer.putInt(data.count);
return buffer.array();
}
Führen Sie Kafka Streams über das Exec Maven Plugin (http://www.mojohaus.org/exec-maven-plugin/) aus.
$ mvn clean install exec:exec@json
Ich habe das Fensterintervall auf 2 Minuten und das Cache-Festschreibungsintervall auf 3 Minuten eingestellt. Immerhin gibt es mehrmals doppelte Ausgaben, aber ich konnte die doppelten Ausgaben reduzieren.
{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773}
{"time": "2017-08-08T11:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414063}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.453125}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.476563}
{"time": "2017-08-08T11:38:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.546875}