Continued on Tutorial: Real-time analysis of stock data using Kinesis Data Streams.
For learning learning-module-1 branch,
// TODO: Implement method
There is a part where teeth are missing. Based on this branch (it seems that it can not be built now), we will proceed with learning while comparing it with the, master branch.
Start with Step 4: Implement the Producer (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-producer.html). Part that writes records using AWS SDK for Java (KPL is not used). As described in the tutorial, the procedure is
Will be done. What is missing teeth
Only.
writer.StockTradesWriter.SendStockTrade()
/**
*Send stock trading information to a given stream using Kinesis client
*
* @param trade Instance representing a stock trade
* @param kinesisClient Kinesis client
* @param streamName Stream name
*/
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient,
String streamName) {
byte[] bytes = trade.toJsonAsBytes();
// Jackson(JSON library)Addressed the possibility that bytes would be null if did not work
if (bytes == null) {
LOG.warn("Could not get JSON bytes for stock trade");
return;
}
LOG.info("Putting trade: " + trade.toString());
PutRecordRequest putRecord = new PutRecordRequest();
putRecord.setStreamName(streamName);
//Use ticker symbol for partition key
putRecord.setPartitionKey(trade.getTickerSymbol());
putRecord.setData(ByteBuffer.wrap(bytes));
try {
kinesisClient.putRecord(putRecord);
} catch (AmazonClientException ex) {
LOG.warn("Error sending record to Amazon Kinesis.", ex);
}
}
--Since PutRecord API writes in byte array, the object representing stock transaction information is [toJsonAsBytes ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/master/src Convert to a byte array using /com/amazonaws/services/kinesis/samples/stocktrades/model/StockTrade.java#L80) (internally [writeValueAsBytes ()
](https: / in jackson.databind.ObjectMapper
] /fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#writeValueAsBytes (java.lang.Object))
--[Ticker symbol] as partition key (https://ja.wikipedia.org/wiki/%E3%83%86%E3%82%A3%E3%83%83%E3%82%AB%E3%83% BC% E3% 82% B7% E3% 83% B3% E3% 83% 9C% E3% 83% AB) is used. According to the tutorial, hundreds or thousands of partition keys are a guideline for one shard (How to determine the storage destination shard previous article leo-mon / items / 45602438bb3e9ad220ca #% E5% 87% BA% E5% 8A% 9B% E3% 83% 91% E3% 83% A9% E3% 83% A1% E3% 83% BC% E3% 82% BF % E3% 81% AB% E3% 81% A4% E3% 81% 84% E3% 81% A6))
--Fixed write failures such as write limit per shard, API call limit, and NW connection error (this time, just put it between try ... catch) --This time, I used the PutRecord API, but if a large number of records are generated, consider using the PutRecords API to send multiple records at once (for details, Add data to stream
Then go to Step 5: Implement Consumers (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-consumer.html). What is missing teeth
processor.StockTradeRecordProcessor.reportStats()
processor.StockTradeRecordProcessor.resetStats()
processor.StockTradeRecordProcessor.processRecord()
Call these for understanding [processor.StockTradeProcessor
](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/ I will follow from the amazonaws / services / kinesis / samples / stocktrades / processor / StockTradesProcessor.java) class.
Let's extract the important part of the main ()
method
public static void main(String[] args) throws Exception {
...
//KCL settings
KinesisClientLibConfiguration kclConfig =
new KinesisClientLibConfiguration(applicationName, streamName, credentialsProvider, workerId)
.withRegionName(region.getName())
.withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent());
//Class factory for IRecordProcessor interface
IRecordProcessorFactory recordProcessorFactory = new StockTradeRecordProcessorFactory();
//Creating a worker
Worker worker = new Worker(recordProcessorFactory, kclConfig);
int exitCode = 0;
try {
//worker running
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
System.exit(exitCode);
}
The execution of KCL is roughly divided into
There are 3 steps.
The second of these, the class with the RecordProcessor interface and its factories (in this case StockTradeRecordProcessor
and StockTradeRecordProcessorFactory
), are subject to user implementation.
processor.StockTradeRecordProcessor
It was blank [processor.StockTradeRecordProcessor.reportStats ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services /kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L88), [processor.StockTradeRecordProcessor.resetStats ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning- module-1 / src / com / amazonaws / services / kinesis / samples / stocktrades / processor / StockTradeRecordProcessor.java # L92), processor.StockTradeRecordProcessor.processRecord ()
This class has /amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services/kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L96).
StockTradeRecordProcessor.java
public class StockTradeRecordProcessor implements IRecordProcessor {
...
/**
*Initialization method that receives the shard ID to be processed by the instance
*/
@Override
public void initialize(String shardId) {
LOG.info("Initializing record processor for shard: " + shardId);
this.kinesisShardId = shardId;
nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
/**
*Method to process the retrieved record
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
for (Record record : records) {
//Process record
processRecord(record);
}
//Report when the period set for the interval has elapsed
if (System.currentTimeMillis() > nextReportingTimeInMillis) {
reportStats();
resetStats();
nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
}
//checkpoint every checkpoint interval
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
private void reportStats() {
// TODO: Implement method
System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
stockStats + "\n" +
"****************************************************************\n");
}
private void resetStats() {
// TODO: Implement method
stockStats = new StockStats();
}
private void processRecord(Record record) {
// TODO: Implement method
//Record from byte array to object
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
if (trade == null) {
LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
return;
}
stockStats.addStockTrade(trade);
}
/**
*Method called when processing ends or there is no response
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
LOG.info("Shutting down record processor for shard: " + kinesisShardId);
//You can start processing data from child shards by checkingpoint when you reach the end of the shard.
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
LOG.info("Checkpointing shard " + kinesisShardId);
try {
checkpointer.checkpoint();
} catch (ShutdownException se) {
//Ignore checkpoint when processor instance is shut down (failover)
LOG.info("Caught shutdown exception, skipping checkpoint.", se);
} catch (ThrottlingException e) {
//Do not checkpoint if throttling.Consider backoff and retry in actual operation.
LOG.error("Caught throttling exception, skipping checkpoint.", e);
} catch (InvalidStateException e) {
//Error if there is a problem with DynamoDB
LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
}
}
}
This class is implemented to satisfy the interface of ʻIRecordProcessor. ʻIRecordProcessor
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
(* The interface used in this tutorial is version 1, and now it is more sophisticated version 2. There is latest / dev / kinesis-record-processor-implementation-app-java.html # kcl-java-interface-v2)).
Of these, the method used inside processRecords ()
was the target of the tutorial implementation.
Even such a simple application can be easily implemented by using KCL by just playing around with processRecords ()
. Also
When a new record becomes available, KCL retrieves the record and calls the record processor so you don't have to worry about how to retrieve the record from Kinesis Data Streams. You also don't have to worry about the number of shards or consumer instances. As the stream scales up, you don't have to rewrite your application to handle multiple shards or consumer instances.
There is a merit such as.
List <Record>
This tutorial deals with:
for (Record record : records) {
processRecord(record);
}
...
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
The Record
class itself is described in the AWS SDK documentation. Yes.
Calling getData ()
returns the ByteBuffer
class Since it will come, output it to a byte array with [ʻarray () ](https://docs.oracle.com/javase/6/docs/api/java/nio/ByteBuffer.html#array ()) and it [
FromJsonAsBytes](https://github.com/aws-samples/amazon-kinesis-learning/blob/master/src/com/amazonaws/services/kinesis/samples/stocktrades/model/StockTrade.java#L88 ) Converts it to a Jackson object so that it can be handled. (Internally Jackson's [
readValue ()`](https://fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#readValue (byte [],%) I'm reading 20java.lang.Class)).)
It indicates a specific point in the stream. ([Details of data written to DynamoDB](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-ddb.html#kinesis-record-processor-ddb- table-contents) was easy to understand)
The checkpointer
passed by the worker each timeprocessRecords ()
is called indicates where the worker is currently processing (code maybe [this area](https://github.com/awslabs/amazon-" kinesis-client / blob / v1.x / src / main / java / com / amazonaws / services / kinesis / clientlibrary / lib / worker / RecordProcessorCheckpointer.java)). This tutorial takes the form of reporting the processing position every specified number of seconds. It is likely that different strategies will be taken for full-scale use.
This completes the three tutorials, and you can learn how to use the PutRecord API in the AWS SDK for Java and how to use KCL easily. While I was able to understand the outline, I got the impression that the detailed usage could not be learned from the tutorial alone, and it was necessary to hit the documentation and the source code itself.
It seems that it has been a long time since this tutorial itself was written, and you can see that the interface and version used are also old. So next time, I think I'll try updating various places according to this tutorial.
Recommended Posts