** Kinesis Client Library (KCL) ** is an AWS-made library for implementing consumer applications that receive and process data (records) flowing in * AWS Kinesis Data Stream *.
To process Kinesis records
There are either of the methods, but ** KCL ** is used in the former case.
However, it is difficult to understand when the ** record processor ** is called and what happens when the ** record processor ** returns an error. If you make a mistake here, Kinesis Records can stay in the stream or be unintentionally discarded.
This article supplements items not specified in the Official Guide to AWS Kinesis Data Streams. I will explain how to implement an application using ** KCL **.
Note that ** KCL ** has libraries for * Java *, * JavaScript (Node.js) *, * Python *, * .NET *, and * Ruby *, but in this article, * It deals with the library for Java * (1.x interface v2. Details will be described later).
Normally, getting a record from a Kinesis stream requires steps like getting a shard from the stream of interest, getting an iterator to scan the records in the shard, and looping through the iterator. (For more information, see Consumer Development Using the Kinesis Data Streams API and AWS SDK for Java (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-sdk.html) ) See)
** KCL ** will do this for you automatically and get the record for you.
Kinesis streams, as the name implies, are streams, so unlike queues, you can't delete retrieved records. The record remains in the stream until a certain amount of time has passed.
Therefore, if you do not remember how much the application processed the records in the stream, if you stop / restart the application, all the records in the stream will be processed again from the beginning (TRIM_HORIZON
), or the application will be stopped. All the records that flowed in will be discarded (LATEST
).
** KCL ** keeps track of which records have been processed, and when the application is stopped / restarted, it resumes processing from the records that have not been processed yet.
To scale the Kinesis stream, increase the number of shards, but if you implement the record acquisition process yourself as described above, you will have to implement a mechanism to detect that the shards have increased (decreased). I will.
** KCL ** detects the increase or decrease of shards in the Kinesis stream and automatically starts processing the newly opened shards.
Kinesis streams ensure that records with the same partition key always flow to the same shard. (Set the partition key to a unique identifier for the record, for example "order number")
This means that related records are not scattered across multiple shards, so each shard can be processed in parallel completely independently. (Rather, it doesn't make sense to increase shards otherwise)
** KCL ** processes each detected shard in parallel in a separate thread.
** KCL ** applications usually handle all the shards of the target Kinesis stream in one process, but as the number of shards increases, the degree of parallelism increases and one process can not handle it. There is a possibility.
In that case, you can launch multiple KCL applications (processes) for the same Kinesis stream to distribute the load among the processes.
** KCL for Java ** currently has the following versions: This article deals with ** "Module version 1.x interface version v2" **.
Module version | Interface version | package | Remarks |
---|---|---|---|
1.x [GitHub] | v1 | com.amazonaws.services.kinesis.clientlibrary.interfaces |
Also called "original interface" |
〃 | v2 | com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 |
This is the version covered in this article |
2.x [GitHub] | - | software.amazon.kinesis |
TotakeadvantageofextendedfanoutUsethisversionNeeded |
The difference between the interface version * v1 * and * v2 * of the module version * 1.x * is small,
It is two points.
The module versions * 1.x * and * 2.x * do not differ significantly in the usage of implementing the ** record processor **, but the API has been redesigned and is not compatible.
A handler for processing records flowing through a Kinesis stream. The base interface defines three methods, "initialization processing", "record processing", and "termination processing", and the application developer must implement these three methods.
The ** Record Processor ** is instantiated in a KCL application with a one-to-one correspondence with Kinesis stream shards and is responsible for processing the records flowing through each shard.
Shows how far the records in the Kinesis stream (more precisely, in the shard) have been processed. Checkpoints are not recorded automatically and must be recorded appropriately by the application developer in the ** record processor **. (The component for recording checkpoints is called ** checkpointer **)
** Checkpoints ** are recorded in * DynamoDB *. When you run the KCL application, ** KCL ** automatically creates a table in * DynamoDB *.
Record processor life cycle management (generation / termination) is performed according to the number of shards in the Kinesis stream. The application developer needs to generate and start a ** worker ** with the required parameters (such as which Kinesis stream to process).
There is only one ** worker ** in the KCL application.
The following steps are required to develop an application using ** KCL **.
Sample code (Java / Scala / Kotlin) is available on GitHub.
In addition, Sample Code (Java) is also available from the AWS official.
First, implement a ** record processor ** to process Kinesis records.
The record processor is thread safe. Each method of the record processor is never called by multiple threads. Therefore, the record processor can have the necessary information as an instance variable.
Implements the interface com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
.
Implement the initial processing method void initialize (InitializationInput)
.
If there are resources required for record processing, initialize them here.
From the parameter ʻInitializationInput`, you can get the ID of the shard that this record processor is in charge of, so keep it in an instance variable if necessary.
Implement the record processing method void processRecords (ProcessRecordsInput)
.
Process records received from Kinesis streams.
You can get a list of received records from the parameter ProcessRecordsInput
.
After successfully processing the record, record the checkpoint using the ** checkpointer ** that can be obtained with ProcessRecordsInput # getCheckpointer ()
.
Checkpoints are recorded in * DynamoDB *, so exceptions such as insufficient capacity may occur under heavy load. You should retry when an exception occurs to ensure that the checkpoint is recorded.
You need to consider that the same record can be processed multiple times, such as when you stop / restart a KCL application. In other words, record processing should be idempotent.
Implement the termination method void shutdown (ShutdownInput)
.
** Note that this method will not be called until the #processRecords () has been processed. ** **
Release the resources secured in the initial processing.
Record checkpoints only if the termination reason returned by ShutdownInput # getShutdownReason ()
is TERMINATE
.
Checkpoints are recorded in * DynamoDB *, so exceptions such as insufficient capacity may occur under heavy load. You should retry when an exception occurs to ensure that the checkpoint is recorded.
ExampleRecordProcessor
public class ExampleRecordProcessor implements IRecordProcessor {
private final String tableName;
ExampleRecordProcessor(String tableName) {
this.tableName = tableName;
}
private String shardId;
private AmazonDynamoDB dynamoDB;
private Table table;
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.getShardId();
// Initialize any resources for #processRecords().
dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
table = new Table(dynamoDB, tableName);
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
// Processing incoming records.
retry(() -> {
processRecordsInput.getRecords().forEach(record -> {
System.out.println(record);
});
});
// Record checkpoint if all incoming records processed successfully.
recordCheckpoint(processRecordsInput.getCheckpointer());
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
// Record checkpoint at closing shard if shutdown reason is TERMINATE.
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
recordCheckpoint(shutdownInput.getCheckpointer());
}
// Cleanup initialized resources.
Optional.ofNullable(dynamoDB).ifPresent(AmazonDynamoDB::shutdown);
}
private void recordCheckpoint(IRecordProcessorCheckpointer checkpointer) {
retry(() -> {
try {
checkpointer.checkpoint();
} catch (Throwable e) {
throw new RuntimeException("Record checkpoint failed.", e);
}
});
}
private void retry(Runnable f) {
try {
f.run();
} catch (Throwable e) {
System.out.println(String.format("An error occurred %s. That will be retry...", e.getMessage()));
try {
Thread.sleep(3000);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
retry(f);
}
}
Then implement the ** Record Processor Factory ** that will be used to generate the record processor. ** Worker ** uses this factory to generate a record processor.
Implement the interface com.amazonaws.services.kinesis.clientlibrary.interfaces.v2. IRecordProcessorFactory
.
Implement the record processor creation method ʻIRecordProcessor createProcessor ()`.
Creates and returns an instance of the ** record processor ** implemented above.
ExampleRecordProcessorFactory
public class ExampleRecordProcessorFactory implements IRecordProcessorFactory {
private final String tableName;
ExampleRecordProcessorFactory(String tableName) {
this.tableName = tableName;
}
@Override
public IRecordProcessor createProcessor() {
return new ExampleRecordProcessor(tableName);
}
}
Finally, from the entry point (* Main * class) of the KCL application, create and launch a ** worker **.
Use com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder
to generate * ** workers **.
# build ()
.The read / write capacity of the DynamoDB table for recording checkpoints, which is automatically created by * ** KCL **, defaults to 10
.
If you want to change it, you can specify it with KinesisClientLibConfiguration
.
Worker # run ()
Starts the generated ** worker **.App
public class App {
public static void main(String... args) {
// Create a Worker.
final Worker worker = new Worker.Builder()
.recordProcessorFactory(
new ExampleRecordProcessorFactory("examples-table")
)
.config(
new KinesisClientLibConfiguration(
"kcl-java-example",
"kcl-sample",
DefaultAWSCredentialsProviderChain.getInstance(),
generateWorkerId()
).withRegionName("us-east-1")
.withInitialLeaseTableReadCapacity(1)
.withInitialLeaseTableWriteCapacity(1)
)
.build();
// Start the worker.
worker.run();
}
private static String generateWorkerId() {
try {
return InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
} catch (UnknownHostException e) {
throw new RuntimeException("Could not generate worker ID.", e);
}
}
}
Call Worker # startGracefulShutdown ()
to safely stop the started ** worker ** (if there is a record in process, it finishes processing and records a checkpoint before stopping).
Normally a Java VM shutdown hook ([Runtime # addShutdownHook ()](https://docs.oracle.com/javase/jp/9/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang. By calling from Thread-)), you can safely stop the ** worker ** at the end of the KCL application's JVM process.
App
final Worker worker = ...;
// Shutdown worker gracefully using shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
worker.startGracefulShutdown().get();
} catch (Throwable e) {
e.printStackTrace();
}
}));
If you want to safely shut down ** workers **, you must also implement a record processor to shut down safely.
** Add the interface com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
to the record processor **.
Implement the termination request method void shutdownRequested (IRecordProcessorCheckpointer)
.
** Note that this method will not be called until the #processRecords () has been processed. ** **
Record checkpoints.
After this method, the above-mentioned shutdown method # shutdown ()
is also called, but the reason for termination is ZOMBIE
, so a checkpoint must be recorded in this method.
Checkpoints are recorded in * DynamoDB *, so exceptions such as insufficient capacity may occur under heavy load. You should retry when an exception occurs to ensure that the checkpoint is recorded.
ExampleRecordProcessor
public class ExampleRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
:
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
// Record checkpoint at graceful shutdown.
recordCheckpoint(checkpointer);
}
}
+-------------------+
| Waiting on Parent | +------------------+
+----+ Shard | | Shutdown |
| | | +--------------------+ Notification |
| +----------+--------+ | Shutdown: | Requested |
| | Success | Requested +-+-------+--------+
| | | | |
| +------+-------------+ | | | Shutdown:
| | Initializing +-----+ | | Requested
| | | | | |
| | +-----+-------+ | |
| +---------+----------+ | | Shutdown: | +-----+-------------+
| | Success | | Terminated | | Shutdown |
| | | | Zombie | | Notification +-------------+
| +------+-------------+ | | | | Complete | |
| | Processing +--+ | | ++-----------+------+ |
| +---+ | | | | | |
| | | +----------+ | | | Shutdown: |
| | +------+-------------+ | \ / | Requested |
| | | | \/ +--------------------+
| | | | ||
| | Success | | || Shutdown:
| +----------+ | || Terminated
| | || Zombie
| | ||
| | ||
| | +---++--------------+
| | | Shutting Down |
| +-----------+ |
| | |
| +--------+----------+
| |
| | Shutdown:
| | All Reasons
| |
| |
| Shutdown: +--------+----------+
| All Reasons | Shutdown |
+-------------------------------------------------------+ Complete |
| |
+-------------------+
1.When the worker starts | 2.When receiving a record | 3.At shard CLOSE (* 1) | 4.When the shard is open (* 1) | 5.When a worker is safely stopped | |
---|---|---|---|---|---|
IRecordProcessor#initialize() |
① Securing resources, etc. | - | - | ① Securing resources, etc. | - |
IRecordProcessor#processRecords() |
- | ① Processing of received records ② Record of checkpoint |
- | - | - |
IRecordProcessor#shutdown() (※2) |
- | - | ① reason=TERMINATE Checkpoint recording |
- | ② reason=ZOMBIE |
IShutdownNotificationAware#shutdownRequested() (※2) |
- | - | - | - | ① Record of checkpoint |
-* 1 If you increase or decrease the number of shards in the Kinesis stream, a new shard will be opened after the existing shard is closed.
That is, the record processor # shutdown ()
for the existing shard is called, and the record processor #initialize ()
for the new shard is called.
-* 2 When # processRecords ()
is running, # shutdown ()
and # shutdownRequested ()
will not be called until the processing of the running# processRecords ()
is completed.
The behavior when an exception is thrown from each method implemented in the record processor is as follows.
Method | Behavior when throwing an exception |
---|---|
IRecordProcessor#initialize() |
It will continue to be called repeatedly until it returns normally. |
IRecordProcessor#processRecords() |
The error log is output and passed as an argumentRecord skippedWill be done. |
IRecordProcessor#shutdown() |
It will continue to be called repeatedly until it returns normally. |
IShutdownNotificationAware#shutdownRequested() |
It will continue to be called repeatedly until it returns normally. |
If an error occurs in the record processing (# processRecords ()
) of the ** record processor **, the basic idea of ** KCL ** is to skip the record and proceed to the next record. ..
If you do not want to skip records due to application requirements (records must be processed in order, etc.), ** KCL ** does not have a mechanism, so implement it on your own. is needed.
If # processRecords ()
encounters an error, it will continue to retry the process without returning or throwing an exception.
If the error is temporary (such as a temporary AWS interruption), it will be automatically recovered by a retry.
If the error is permanent (such as an unexpected record), it will not be automatically recovered and will continue to retry.
If you get into a state where you keep retrying, the record will not advance any more (record will stay), so you need to process the record by modifying the application.
If the ** worker ** is implemented to safely stop when stopping a KCL application that is in a retry state, the record processor will continue to retry with # processRecords ()
, so #shutdownRequested ()
and#shutdown ()
will not be called and will wait for termination, but the shutdown hook timeout will force the JVM to terminate.
(That is, the record processor does not record the checkpoint and will be reprocessed from the record in error next time.)
The log output from ** KCL ** can be controlled by the following loggers.
Logger | Description |
---|---|
com.amazonaws.services.kinesis.clientlibrary |
NormallyINFO 。KCLIf you want to output the debug log ofDEBUG 。 |
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker |
INFO If you do the followingSleeping... Because it is annoying because the log that saysWARN It is good to keep it. |
The read / write capacity of the DynamoDB table for recording checkpoints is fixed at the initial value (default 10
) as it is, but as the number of records flowing in the Kinesis stream increases, the checkpoint increases. Tables are accessed more often and may run out of capacity.
If you run out of capacity, you will get errors in recording checkpoints, so you need to increase your capacity.
Whenever the capacity is insufficient, manually increase the capacity or auto-scale.
The checkpoint table is not deleted automatically, so you will need to manually delete the checkpoint table when you discontinue the KCL application.
AWS Official Guide
Kinesis Client Library Consumer Development in Java ・ ・ ・ Instructions on how to use ** KCL for Java **.
Status Tracking ・ ・ ・ Explanation of each item in the checkpoint table.
Resharding, extension, parallel processing ・ ・ ・ Kinesis stream shard count Explanation of the behavior when changing.
Duplicate Record Handling-Consumer Retry (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-duplicates.html#kinesis-record-processor-duplicates -consumer) ・ ・ ・ Explanation that the same record may be processed multiple times in the KCL application.
Recovery from Amazon Kinesis Data Streams failure ・ ・ ・ KCL application Description of error handling.
[Developing Consumers in Java with Kinesis Client Library 2.x Extended Fanout](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/building-enhanced-consumers- kcl-java.html) ・ ・ ・ Explanation of how to use Kinesis extended fanout.
[(BDT403) Best Practices for Building Real-time Streaming Applications with Amazon Kinesis](https://www.slideshare.net/AmazonWebServices/bdt403-best-practices-for-building-realtime-streaming-applications-with-amazon -kinesis / 15) ・ ・ ・ From P15, there are explanations such as precautions for implementing KCL applications.
Recommended Posts