I used Jupyter as the operating environment for SensorTag data in PySpark Streaming I tried window aggregation. There are several other stream processing frameworks, but next I'll try using Kafka Streams. Unlike Spark, this is a library, not a cluster. Currently, the development language officially supports only Java.
I will write the code of Eclim built on Ubuntu 16.04 with Maven.
Create the following files in the project directory. The complete code can be found here in the Repository (https://github.com/masato/streams-kafka-examples).
$ tree
.
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── github
│ └── masato
│ └── streams
│ └── kafka
│ ├── App.java
│ ├── SensorSumDeserializer.java
│ ├── SensorSum.java
│ └── SensorSumSerializer.java
└── resources
└── logback.xml
9 directories, 7 files
App.java
I will explain the code in several parts.
The topic name etc. are obtained from the environment variables defined in pom.xml. WINDOWS_MINUTES
is the interval for window aggregation. COMMIT_MINUTES
is the interval at which Kafka automatically commits the cache, as described below. Here, specify by minutes.
App.java
public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);
private static final String SOURCE_TOPIC = System.getenv("SOURCE_TOPIC");
private static final String SINK_TOPIC = System.getenv("SINK_TOPIC");
private static final long WINDOWS_MINUTES = 2L;
private static final long COMMIT_MINUTES = 3L;
Create a record serializer and deserializer. In the Kafka Streams app, the intermediate result of the process is saved in the topic and the flow is implemented. SerDe is defined by combining the deserializer for reading records from a topic and the serializer for writing records. SerDe is required for each topic key and value type.
jsonSerde In the SensorTag record, the key is a string and the value is JsonNode of Jackson. com / fasterxml / jackson / databind / JsonNode.html) object.
sensorSumSerde
SenroSum
is a class that holds the state of custom-created ambient temperature and window aggregation.
stringSerde
SerDe for the default String. This time the message keys are all String
.
doubleSerde
SerDe for the default double. The ambient temperature (ambient) of the SensorTag is window-aggregated with double
.
App.java
public static void main(String[] args) throws Exception {
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde =
Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serializer<SensorSum> sensorSumSerializer =
new SensorSumSerializer();
final Deserializer<SensorSum> sensorSumDeserializer =
new SensorSumDeserializer();
final Serde<SensorSum> sensorSumSerde =
Serdes.serdeFrom(sensorSumSerializer,
sensorSumDeserializer);
final Serde<String> stringSerde = Serdes.String();
final Serde<Double> doubleSerde = Serdes.Double();
First call stream ()
of KStreamBuilder and call [KStream](https: // kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html). The key of the topic is a character string, and the value is SerDe of JsonNode.
App.java
final KStreamBuilder builder = new KStreamBuilder();
LOG.info("Starting Sorting Job");
final KStream<String, JsonNode> source =
builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
The SensorTag message arrives on the Kafka topic as a JSON string from the Raspberry Pi 3.
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875}
A KStream record is a KeyValue object with a key and a value. In the example, we call map ()
to create a new KStream with only a key and ambient temperature pair in order to window aggregate only the average ambient temperature (ambient).
Then call groupByKey ()
and group by key to create KGroupedStream .. In the record, the key is a character string and the value is the ambient temperature double
, so specify each SerDe.
App.java
final KGroupedStream<String, Double> sensors =
source
.map((k, v) -> {
double ambient = v.get("ambient").asDouble();
return KeyValue.pair(k, ambient);
})
.groupByKey(stringSerde, doubleSerde);
Call ʻaggregate ()` of KGroupedStream to create KTable. KTable keeps the total value of records and the number of records at the window interval specified for each key.
In the first argument of ʻaggregate () [Initializer](https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Initializer.html), the aggregate used for stream aggregation Initialize. Here, initialize the
SensorSumthat holds the state of window aggregation. Implement the aggregator with the second argument. The key and value of the current record and the
SensorSumcreated in the previous record processing are passed. Returns a new
SensorSumby adding the total value and the number of records each time data arrives. The third argument defines the 2-minute window [TimeWindows](http://apache.mesi.com.ar/kafka/0.10.2.0/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html). .. The 4th argument is SerDe of
SensorSum`, and the 5th argument is the topic name that holds the state.
App.java
final KTable<Windowed<String>, SensorSum> sensorAgg =
sensors
.aggregate(() -> new SensorSum(0D, 0)
, (aggKey, value, agg) -> new SensorSum(agg.sum + value, agg.count + 1)
, TimeWindows.of(TimeUnit.MINUTES.toMillis(WINDOWS_MINUTES))
, sensorSumSerde,
"sensorSum");
Calculate the mean with mapValues ()
in KTable. The average of the sum divided by the number of records is the new KTable for the Double
record. From here, call toStream ()
to create a KStream. The record is formatted into a JSON string with a timestamp, key, and mean and output to the stream. Timestamps are set to ISO 8601 to facilitate data exchange between different systems. Save the record to the last specified topic and finish.
App.java
final DateTimeFormatter fmt =
DateTimeFormatter.ISO_OFFSET_DATE_TIME;
sensorAgg
.<Double>mapValues((v) -> ((double) v.sum / v.count))
.toStream()
.map((key, avg) -> {
long end = key.window().end();
ZonedDateTime zdt =
new Date(end).toInstant()
.atZone(ZoneId.systemDefault());
String time = fmt.format(zdt);
String bid = key.key();
String retval =
String.format("{\"time\": \"%s\", \"bid\": \"%s\", \"ambient\": %f}",
time, bid, avg);
LOG.info(retval);
return new KeyValue<String,String>(bid, retval);
})
.to(SINK_TOPIC);
Create Kafka Streams from the configuration objects and builder to start the Kafka Streams app. Also, register it in the shutdown hook to stop Kafka Stream with SIGTERM
.
App.java
final StreamsConfig config = new StreamsConfig(getProperties());
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Create Properties
to be used in Kafka Streams settings from environment variables.
App.java
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
System.getenv("APPLICATION_ID_CONFIG"));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
TimeUnit.MINUTES.toMillis(COMMIT_MINUTES));
return props;
}
COMMIT_INTERVAL_MS_CONFIG
Initially, I didn't change StreamsConfig.COMMIT_INTERVAL_MS_CONFIG. Before saving the record as a topic, the log is output by map () of KStream. I wanted to output the aggregated result of the 2-minute window interval only once at the end, but the result was unspecified 4-5 times.
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.385417}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.410156}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.440341}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.450521}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
With reference to the following articles, this seems to be the expected behavior due to the KTable's changelog stream feature. There is no final result of window aggregation in KTable, and the values updated in the cache are committed at regular intervals. It seems that you need to implement your own code to remove duplicate records using transform ()
and process ()
after toStream ()
to KStream.
You cannot completely eliminate duplicate records, but you can reduce the number of cache commits by increasing the value of StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
. 30 seconds is specified for the Default value (http://docs.confluent.io/3.2.1/streams/developer-guide.html#optional-configuration-parameters).
Prepare the model (SensorSum.java), serializer (SensorSumSerializer.java), and deserializer (SensorSumDeserializer) classes. The serializer implements serialize ()
to convert the properties of SensorSum
to a byte array. Allocate 8 bytes of Double
of the total ambient temperature value and 4 bytes of ʻInteger` of the number of records to the byte buffer.
SensorSumSerializer.java
public byte[] serialize(String topic, SensorSum data) {
ByteBuffer buffer = ByteBuffer.allocate(8 + 4);
buffer.putDouble(data.sum);
buffer.putInt(data.count);
return buffer.array();
}
Run Kafka Streams from the Exec Maven Plugin (http://www.mojohaus.org/exec-maven-plugin/).
$ mvn clean install exec:exec@json
I set the window interval to 2 minutes and the cache commit interval to 3 minutes. After all, there are duplicate outputs several times, but I was able to reduce the duplicate outputs.
{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773}
{"time": "2017-08-08T11:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414063}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.453125}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.476563}
{"time": "2017-08-08T11:38:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.546875}