Kinesis Data Streams from zero Java experience (3.2)

Continued on Tutorial: Real-time analysis of stock data using Kinesis Data Streams.

For learning learning-module-1 branch,

// TODO: Implement method

There is a part where teeth are missing. Based on this branch (it seems that it can not be built now), we will proceed with learning while comparing it with the, master branch.

Producer implementation

Start with Step 4: Implement the Producer (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-producer.html). Part that writes records using AWS SDK for Java (KPL is not used). As described in the tutorial, the procedure is

  1. Enter the stream name and region name
  2. Create ClientBuilder
  3. Set region, credentials, and client configuration
  4. Configure Kinesis client from Client Builder
  5. Check the status of the stream
  6. Send a random transaction to the stream every 100ms

Will be done. What is missing teeth

Only.

writer.StockTradesWriter.SendStockTrade()

/**
 *Send stock trading information to a given stream using Kinesis client
 *
 * @param trade Instance representing a stock trade
 * @param kinesisClient Kinesis client
 * @param streamName Stream name
 */
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient,
            String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // Jackson(JSON library)Addressed the possibility that bytes would be null if did not work
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }

    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    //Use ticker symbol for partition key
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}

--Since PutRecord API writes in byte array, the object representing stock transaction information is [toJsonAsBytes ()](https://github.com/aws-samples/amazon-kinesis-learning/blob/master/src Convert to a byte array using /com/amazonaws/services/kinesis/samples/stocktrades/model/StockTrade.java#L80) (internally [writeValueAsBytes ()](https: / in jackson.databind.ObjectMapper] /fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#writeValueAsBytes (java.lang.Object)) --[Ticker symbol] as partition key (https://ja.wikipedia.org/wiki/%E3%83%86%E3%82%A3%E3%83%83%E3%82%AB%E3%83% BC% E3% 82% B7% E3% 83% B3% E3% 83% 9C% E3% 83% AB) is used. According to the tutorial, hundreds or thousands of partition keys are a guideline for one shard (How to determine the storage destination shard previous article leo-mon / items / 45602438bb3e9ad220ca #% E5% 87% BA% E5% 8A% 9B% E3% 83% 91% E3% 83% A9% E3% 83% A1% E3% 83% BC% E3% 82% BF % E3% 81% AB% E3% 81% A4% E3% 81% 84% E3% 81% A6))

--Fixed write failures such as write limit per shard, API call limit, and NW connection error (this time, just put it between try ... catch) --This time, I used the PutRecord API, but if a large number of records are generated, consider using the PutRecords API to send multiple records at once (for details, Add data to stream

Consumer implementation

Then go to Step 5: Implement Consumers (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-consumer.html). What is missing teeth

Call these for understanding [processor.StockTradeProcessor](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/ I will follow from the amazonaws / services / kinesis / samples / stocktrades / processor / StockTradesProcessor.java) class.

processor.StockTradeProcessor

Let's extract the important part of the main () method

public static void main(String[] args) throws Exception {

    ...

    //KCL settings
    KinesisClientLibConfiguration kclConfig =
            new KinesisClientLibConfiguration(applicationName, streamName, credentialsProvider, workerId)
        .withRegionName(region.getName())
        .withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent());
    
    //Class factory for IRecordProcessor interface
    IRecordProcessorFactory recordProcessorFactory = new StockTradeRecordProcessorFactory();

    //Creating a worker
    Worker worker = new Worker(recordProcessorFactory, kclConfig);

    int exitCode = 0;
    try {
        //worker running
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.", t);
        exitCode = 1;
    }
    System.exit(exitCode);

}

The execution of KCL is roughly divided into

  1. ** Set through KinesisClientLibConfiguration **
  2. ** Create a function (factory) that returns a class that implements the IRecordProcessor interface **
  3. ** Creating and running a worker with 1 and 2 as arguments **

There are 3 steps. The second of these, the class with the RecordProcessor interface and its factories (in this case StockTradeRecordProcessor and StockTradeRecordProcessorFactory), are subject to user implementation.

processor.StockTradeRecordProcessor It was blank [processor.StockTradeRecordProcessor.reportStats ()](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services /kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L88), [processor.StockTradeRecordProcessor.resetStats ()](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning- module-1 / src / com / amazonaws / services / kinesis / samples / stocktrades / processor / StockTradeRecordProcessor.java # L92), processor.StockTradeRecordProcessor.processRecord () This class has /amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services/kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L96).

StockTradeRecordProcessor.java


public class StockTradeRecordProcessor implements IRecordProcessor {

    ...

    /**
     *Initialization method that receives the shard ID to be processed by the instance
     */
    @Override
    public void initialize(String shardId) {
        LOG.info("Initializing record processor for shard: " + shardId);
        this.kinesisShardId = shardId;
        nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
        nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
    }

    /**
     *Method to process the retrieved record
     */
    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        for (Record record : records) {
            //Process record
            processRecord(record);
        }

        //Report when the period set for the interval has elapsed
        if (System.currentTimeMillis() > nextReportingTimeInMillis) {
            reportStats();
            resetStats();
            nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
        }

        //checkpoint every checkpoint interval
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
    }
    private void reportStats() {
        // TODO: Implement method
        System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                stockStats + "\n" +
                "****************************************************************\n");
    }
    private void resetStats() {
        // TODO: Implement method
        stockStats = new StockStats();
    }
    private void processRecord(Record record) {
        // TODO: Implement method
        //Record from byte array to object
        StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
        if (trade == null) {
            LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
            return;
        }
        stockStats.addStockTrade(trade);
    }

    /**
     *Method called when processing ends or there is no response
     */
    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info("Shutting down record processor for shard: " + kinesisShardId);
        //You can start processing data from child shards by checkingpoint when you reach the end of the shard.
        if (reason == ShutdownReason.TERMINATE) {
            checkpoint(checkpointer);
        }
    }

    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
        LOG.info("Checkpointing shard " + kinesisShardId);
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException se) {
            //Ignore checkpoint when processor instance is shut down (failover)
            LOG.info("Caught shutdown exception, skipping checkpoint.", se);
        } catch (ThrottlingException e) {
            //Do not checkpoint if throttling.Consider backoff and retry in actual operation.
            LOG.error("Caught throttling exception, skipping checkpoint.", e);
        } catch (InvalidStateException e) {
            //Error if there is a problem with DynamoDB
            LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
        }
    }

}

This class is implemented to satisfy the interface of ʻIRecordProcessor. ʻIRecordProcessor

public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) 

(* The interface used in this tutorial is version 1, and now it is more sophisticated version 2. There is latest / dev / kinesis-record-processor-implementation-app-java.html # kcl-java-interface-v2)).

Of these, the method used inside processRecords () was the target of the tutorial implementation. Even such a simple application can be easily implemented by using KCL by just playing around with processRecords (). Also

When a new record becomes available, KCL retrieves the record and calls the record processor so you don't have to worry about how to retrieve the record from Kinesis Data Streams. You also don't have to worry about the number of shards or consumer instances. As the stream scales up, you don't have to rewrite your application to handle multiple shards or consumer instances.

There is a merit such as.

Supplement

Handling of received List <Record>

This tutorial deals with:

for (Record record : records) {
    processRecord(record);
}
...
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());

The Record class itself is described in the AWS SDK documentation. Yes. Calling getData () returns the ByteBuffer class Since it will come, output it to a byte array with [ʻarray () ](https://docs.oracle.com/javase/6/docs/api/java/nio/ByteBuffer.html#array ()) and it [FromJsonAsBytes](https://github.com/aws-samples/amazon-kinesis-learning/blob/master/src/com/amazonaws/services/kinesis/samples/stocktrades/model/StockTrade.java#L88 ) Converts it to a Jackson object so that it can be handled. (Internally Jackson's [readValue ()`](https://fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#readValue (byte [],%) I'm reading 20java.lang.Class)).)

About checkpoint

It indicates a specific point in the stream. ([Details of data written to DynamoDB](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-ddb.html#kinesis-record-processor-ddb- table-contents) was easy to understand) The checkpointer passed by the worker each timeprocessRecords ()is called indicates where the worker is currently processing (code maybe [this area](https://github.com/awslabs/amazon-" kinesis-client / blob / v1.x / src / main / java / com / amazonaws / services / kinesis / clientlibrary / lib / worker / RecordProcessorCheckpointer.java)). This tutorial takes the form of reporting the processing position every specified number of seconds. It is likely that different strategies will be taken for full-scale use.


This completes the three tutorials, and you can learn how to use the PutRecord API in the AWS SDK for Java and how to use KCL easily. While I was able to understand the outline, I got the impression that the detailed usage could not be learned from the tutorial alone, and it was necessary to hit the documentation and the source code itself.

It seems that it has been a long time since this tutorial itself was written, and you can see that the interface and version used are also old. So next time, I think I'll try updating various places according to this tutorial.

Recommended Posts

Kinesis Data Streams from zero Java experience (1)
Kinesis Data Streams from zero Java experience (3.2)
Data processing using stream API from Java 8
Use PostgreSQL data type (jsonb) from Java
About CLDR locale data enabled by default from Java 9
Call Java from JRuby
[Java] Data type ①-Basic type
Access API.AI from Java
From Java to Ruby !!
[Java] Main data types
Java basic data types
Java to fly data from Android to ROS of Jetson Nano
Get weather forecasts from Watson Weather Company Data in simple Java
CData Software hands-on (getting kintone data from Java console application)
CData Software Hands-on (Get Twitter data from Java console application)