[Details] Implementation of consumer applications with Kinesis Client Library for Java

Introduction

** Kinesis Client Library (KCL) ** is an AWS-made library for implementing consumer applications that receive and process data (records) flowing in * AWS Kinesis Data Stream *.

To process Kinesis records

There are either of the methods, but ** KCL ** is used in the former case.

However, it is difficult to understand when the ** record processor ** is called and what happens when the ** record processor ** returns an error. If you make a mistake here, Kinesis Records can stay in the stream or be unintentionally discarded.

This article supplements items not specified in the Official Guide to AWS Kinesis Data Streams. I will explain how to implement an application using ** KCL **.

Note that ** KCL ** has libraries for * Java *, * JavaScript (Node.js) *, * Python *, * .NET *, and * Ruby *, but in this article, * It deals with the library for Java * (1.x interface v2. Details will be described later).

What is Kinesis Client Library?

Functions of KCL

Get record

Normally, getting a record from a Kinesis stream requires steps like getting a shard from the stream of interest, getting an iterator to scan the records in the shard, and looping through the iterator. (For more information, see Consumer Development Using the Kinesis Data Streams API and AWS SDK for Java (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-sdk.html) ) See)

** KCL ** will do this for you automatically and get the record for you.

Manage record processing position

Kinesis streams, as the name implies, are streams, so unlike queues, you can't delete retrieved records. The record remains in the stream until a certain amount of time has passed.

Therefore, if you do not remember how much the application processed the records in the stream, if you stop / restart the application, all the records in the stream will be processed again from the beginning (TRIM_HORIZON), or the application will be stopped. All the records that flowed in will be discarded (LATEST).

** KCL ** keeps track of which records have been processed, and when the application is stopped / restarted, it resumes processing from the records that have not been processed yet.

Following shard increase / decrease

To scale the Kinesis stream, increase the number of shards, but if you implement the record acquisition process yourself as described above, you will have to implement a mechanism to detect that the shards have increased (decreased). I will.

** KCL ** detects the increase or decrease of shards in the Kinesis stream and automatically starts processing the newly opened shards.

Shard parallel processing

Kinesis streams ensure that records with the same partition key always flow to the same shard. (Set the partition key to a unique identifier for the record, for example "order number")

This means that related records are not scattered across multiple shards, so each shard can be processed in parallel completely independently. (Rather, it doesn't make sense to increase shards otherwise)

** KCL ** processes each detected shard in parallel in a separate thread.

Distributed processing by multi-process

** KCL ** applications usually handle all the shards of the target Kinesis stream in one process, but as the number of shards increases, the degree of parallelism increases and one process can not handle it. There is a possibility.

In that case, you can launch multiple KCL applications (processes) for the same Kinesis stream to distribute the load among the processes.

KCL for Java version

** KCL for Java ** currently has the following versions: This article deals with ** "Module version 1.x interface version v2" **.

Module version Interface version package Remarks
1.x [GitHub] v1 com.amazonaws.services.kinesis.clientlibrary.interfaces Also called "original interface"
v2 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 This is the version covered in this article
2.x [GitHub] - software.amazon.kinesis TotakeadvantageofextendedfanoutUsethisversionNeeded

The difference between the interface version * v1 * and * v2 * of the module version * 1.x * is small,

It is two points.

The module versions * 1.x * and * 2.x * do not differ significantly in the usage of implementing the ** record processor **, but the API has been redesigned and is not compatible.

KCL concept

KCL.jpg

Record processor

A handler for processing records flowing through a Kinesis stream. The base interface defines three methods, "initialization processing", "record processing", and "termination processing", and the application developer must implement these three methods.

The ** Record Processor ** is instantiated in a KCL application with a one-to-one correspondence with Kinesis stream shards and is responsible for processing the records flowing through each shard.

Checkpoint

Shows how far the records in the Kinesis stream (more precisely, in the shard) have been processed. Checkpoints are not recorded automatically and must be recorded appropriately by the application developer in the ** record processor **. (The component for recording checkpoints is called ** checkpointer **)

** Checkpoints ** are recorded in * DynamoDB *. When you run the KCL application, ** KCL ** automatically creates a table in * DynamoDB *.

worker

Record processor life cycle management (generation / termination) is performed according to the number of shards in the Kinesis stream. The application developer needs to generate and start a ** worker ** with the required parameters (such as which Kinesis stream to process).

There is only one ** worker ** in the KCL application.

Implementation of KCL application

The following steps are required to develop an application using ** KCL **.

Sample code

Sample code (Java / Scala / Kotlin) is available on GitHub.

In addition, Sample Code (Java) is also available from the AWS official.

Record processor implementation

First, implement a ** record processor ** to process Kinesis records.

The record processor is thread safe. Each method of the record processor is never called by multiple threads. Therefore, the record processor can have the necessary information as an instance variable.

ExampleRecordProcessor


public class ExampleRecordProcessor implements IRecordProcessor {

    private final String tableName;

    ExampleRecordProcessor(String tableName) {
        this.tableName = tableName;
    }

    private String shardId;
    private AmazonDynamoDB dynamoDB;
    private Table table;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.getShardId();

        // Initialize any resources for #processRecords().
        dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
        table = new Table(dynamoDB, tableName);
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        // Processing incoming records.
        retry(() -> {
            processRecordsInput.getRecords().forEach(record -> {
                System.out.println(record);
            });
        });

        // Record checkpoint if all incoming records processed successfully.
        recordCheckpoint(processRecordsInput.getCheckpointer());
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        // Record checkpoint at closing shard if shutdown reason is TERMINATE.
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            recordCheckpoint(shutdownInput.getCheckpointer());
        }

        // Cleanup initialized resources.
        Optional.ofNullable(dynamoDB).ifPresent(AmazonDynamoDB::shutdown);
    }

    private void recordCheckpoint(IRecordProcessorCheckpointer checkpointer) {
        retry(() -> {
            try {
                checkpointer.checkpoint();
            } catch (Throwable e) {
                throw new RuntimeException("Record checkpoint failed.", e);
            }
        });
    }

    private void retry(Runnable f) {
        try {
            f.run();
        } catch (Throwable e) {
            System.out.println(String.format("An error occurred %s. That will be retry...", e.getMessage()));
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
            retry(f);
        }
    }

Record processor factory implementation

Then implement the ** Record Processor Factory ** that will be used to generate the record processor. ** Worker ** uses this factory to generate a record processor.

ExampleRecordProcessorFactory


public class ExampleRecordProcessorFactory implements IRecordProcessorFactory {

    private final String tableName;

    ExampleRecordProcessorFactory(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public IRecordProcessor createProcessor() {
        return new ExampleRecordProcessor(tableName);
    }
}

Worker creation and launch

Finally, from the entry point (* Main * class) of the KCL application, create and launch a ** worker **.

Use com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder to generate * ** workers **.

The read / write capacity of the DynamoDB table for recording checkpoints, which is automatically created by * ** KCL **, defaults to 10. If you want to change it, you can specify it with KinesisClientLibConfiguration.

App


public class App {

    public static void main(String... args) {

        // Create a Worker.
        final Worker worker = new Worker.Builder()
                .recordProcessorFactory(
                        new ExampleRecordProcessorFactory("examples-table")
                )
                .config(
                        new KinesisClientLibConfiguration(
                                "kcl-java-example",
                                "kcl-sample",
                                DefaultAWSCredentialsProviderChain.getInstance(),
                                generateWorkerId()
                        ).withRegionName("us-east-1")
                        .withInitialLeaseTableReadCapacity(1)
                        .withInitialLeaseTableWriteCapacity(1)
                )
                .build();

        // Start the worker.
        worker.run();
    }

    private static String generateWorkerId() {
        try {
            return InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        } catch (UnknownHostException e) {
            throw new RuntimeException("Could not generate worker ID.", e);
        }
    }

}

(Optional) Safe stop of workers

Call Worker # startGracefulShutdown () to safely stop the started ** worker ** (if there is a record in process, it finishes processing and records a checkpoint before stopping).

Normally a Java VM shutdown hook ([Runtime # addShutdownHook ()](https://docs.oracle.com/javase/jp/9/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang. By calling from Thread-)), you can safely stop the ** worker ** at the end of the KCL application's JVM process.

App


final Worker worker = ...;

// Shutdown worker gracefully using shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    try {
        worker.startGracefulShutdown().get();
    } catch (Throwable e) {
        e.printStackTrace();
    }
}));

Safe stop of record processor

If you want to safely shut down ** workers **, you must also implement a record processor to shut down safely.

ExampleRecordProcessor


public class ExampleRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {

    :

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        // Record checkpoint at graceful shutdown.
        recordCheckpoint(checkpointer);
    }

}

Record processor life cycle

Record processor state transition


     +-------------------+
     | Waiting on Parent |                               +------------------+
+----+       Shard       |                               |     Shutdown     |
|    |                   |          +--------------------+   Notification   |
|    +----------+--------+          |  Shutdown:         |     Requested    |
|               | Success           |   Requested        +-+-------+--------+
|               |                   |                      |       |
|        +------+-------------+     |                      |       | Shutdown:
|        |    Initializing    +-----+                      |       |  Requested
|        |                    |     |                      |       |
|        |                    +-----+-------+              |       |
|        +---------+----------+     |       | Shutdown:    | +-----+-------------+
|                  | Success        |       |  Terminated  | |     Shutdown      |
|                  |                |       |  Zombie      | |   Notification    +-------------+
|           +------+-------------+  |       |              | |     Complete      |             |
|           |     Processing     +--+       |              | ++-----------+------+             |
|       +---+                    |          |              |  |           |                    |
|       |   |                    +----------+              |  |           | Shutdown:          |
|       |   +------+-------------+          |              \  /           |  Requested         |
|       |          |                        |               \/            +--------------------+
|       |          |                        |               ||
|       | Success  |                        |               || Shutdown:
|       +----------+                        |               ||  Terminated
|                                           |               ||  Zombie
|                                           |               ||
|                                           |               ||
|                                           |           +---++--------------+
|                                           |           |   Shutting Down   |
|                                           +-----------+                   |
|                                                       |                   |
|                                                       +--------+----------+
|                                                                |
|                                                                | Shutdown:
|                                                                |  All Reasons
|                                                                |
|                                                                |
|      Shutdown:                                        +--------+----------+
|        All Reasons                                    |     Shutdown      |
+-------------------------------------------------------+     Complete      |
                                                        |                   |
                                                        +-------------------+

When each record processor method is called

1.When the worker starts 2.When receiving a record 3.At shard CLOSE (* 1) 4.When the shard is open (* 1) 5.When a worker is safely stopped
IRecordProcessor#initialize() ① Securing resources, etc. - - ① Securing resources, etc. -
IRecordProcessor#processRecords() - ① Processing of received records
② Record of checkpoint
- - -
IRecordProcessor#shutdown()(※2) - - reason=TERMINATECheckpoint recording - reason=ZOMBIE
IShutdownNotificationAware#shutdownRequested()(※2) - - - - ① Record of checkpoint

-* 1 If you increase or decrease the number of shards in the Kinesis stream, a new shard will be opened after the existing shard is closed. That is, the record processor # shutdown () for the existing shard is called, and the record processor #initialize () for the new shard is called. -* 2 When # processRecords () is running, # shutdown () and # shutdownRequested () will not be called until the processing of the running# processRecords ()is completed.

Error handling

Record processor error handling

The behavior when an exception is thrown from each method implemented in the record processor is as follows.

Method Behavior when throwing an exception
IRecordProcessor#initialize() It will continue to be called repeatedly until it returns normally.
IRecordProcessor#processRecords() The error log is output and passed as an argumentRecord skippedWill be done.
IRecordProcessor#shutdown() It will continue to be called repeatedly until it returns normally.
IShutdownNotificationAware#shutdownRequested() It will continue to be called repeatedly until it returns normally.

If you do not want to skip if an error occurs during record processing

If an error occurs in the record processing (# processRecords ()) of the ** record processor **, the basic idea of ** KCL ** is to skip the record and proceed to the next record. ..

If you do not want to skip records due to application requirements (records must be processed in order, etc.), ** KCL ** does not have a mechanism, so implement it on your own. is needed.

Operation of KCL application

Logging

The log output from ** KCL ** can be controlled by the following loggers.

Logger Description
com.amazonaws.services.kinesis.clientlibrary NormallyINFOKCLIf you want to output the debug log ofDEBUG
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker INFOIf you do the followingSleeping...Because it is annoying because the log that saysWARNIt is good to keep it.

Checkpoint table management

References

Recommended Posts

[Details] Implementation of consumer applications with Kinesis Client Library for Java
[AWS x Java] Process DynamoDB Streams with Kinesis Client Library (KCL)
Implementation of clone method for Java Record
Get a list of MBean information for Java applications
CI the architecture of Java / Kotlin applications with ArchUnit
Review and implementation of CSV library for loading large amounts of data into MySQL (Java)
[Java] Simplify the implementation of data history management with Reladomo
Implementation of a math parser with recursive descent parsing (Java)
[Java] Sample project for developing web applications with Spring Boot
[Java] Implementation of Faistel Network
Implementation of XLPagerTabStrip with TabBarController
[Java] Summary of for statements
Implementation of gzip in java
Implementation of tri-tree in Java
Sample of using Salesforce's Bulk API from Java client with PK-chunking
[Code Pipeline x Elastic Beanstalk] Summary of errors and countermeasures for CI / CD Java applications to Elastic Beanstalk with Code Pipeline
[For beginners] Summary of java constructor
[Java EE] Implement Client with WebSocket
Easily Docker Java applications with Jib
Enable OpenCV with java8. (For myself)
Generics of Kotlin for Java developers
Implementation of like function in Java
Get a list of S3 files with ListObjectsV2Request (AWS SDK for Java)
Practice of Java programming basics-I want to display triangles with for statements ①
Practice of Java programming basics-I want to display triangles with for statements ②