Since I had the opportunity to process DynamoDB Streams with Kinesis Client Library (KCL) in an actual project, I would like to introduce how KCL actually behaves and how it was implemented based on that.
Some features of BookLive! use DynamoDB. It became necessary to transfer this DynamoDB data to Redshift as soon as possible after adding the data. As a result of various studies based on the current table format, we came to the conclusion that it is best to set up DynamoDB Streams and handle the data to be added as a so-called stream to bring it to Redshift. In addition, the method of acquiring DynamoDB Streams should be done in the officially recommended How to use KCL. I made it.
--I think that consumer applications that use KCL are basically resident in EC2 instances. However, this time it is processed in a batch style (regular execution / closing), so there are some parts that deviate from the standard. ――Since the capacity mode of DynamoDB is set to on-demand, there is no talk about capacity in this article.
build.gradle
dependencies {
implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.11.833'
implementation 'com.amazonaws:aws-java-sdk-kinesis:1.11.839'
}
build.gradle
dependencies {
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.2'
}
--After being written to DynamoDB, data is added to the stream at very low latency intervals --Shards scale automatically ――In addition, a specific shard does not continue to be used, and new shards are created appropriately. --Streams are retained for 24 hours after addition --Stream settings can be added / deleted at any time --However, only the data added to DynamoDB after the stream settings are added is buffered.
It buffers the stream as if it were Kinesis Data Streams (you can think of the Kinesis Shards part in the figure below as DynamoDB Streams). You can't use KCL directly, but through the Kinesis Adapter (https://github.com/awslabs/dynamodb-streams-kinesis-adapter), DynamoDB is the equivalent of getting a stream from Kinesis Data Streams. It is possible to get Streams. Please note that the figure below is just a personal image.
I won't mention it here, but I will post a reference article.
--Open source library for consumer applications to handle Kinesis Data Streams --The consumer is the side that gets and processes the buffered stream. --On the contrary, the side to add is the producer --DynamoDB automates the process of recording which records of which shards are acquired, which is necessary for handling Kinesis Data Streams. --Since the record is also acquired from the shard, the developer basically only needs to implement the process after acquiring the record.
The following content is synonymous when dealing with DynamoDB Streams (Kinesis Data Streams can be read as DynamoDB Streams).
--Shard
--The main body for distributed buffering of Kinesis Data Streams data
-According to Official FAQ, "Shards are the basic units of throughput for Amazon Kinesis data streams." ing
--Lease
--A state in which records can be retrieved from Kinesis Data Streams shards.
--It seems to be used in the sense that shards are rented to workers (internally, the same shard is not acquired from multiple workers)
-Lease table
--DynamoDB table that records Lease status (worker-shard mapping) and checkpoints
--If it doesn't exist, KCL will create it automatically.
--Therefore, you need permission to create DynamoDB tables on the EC2 instance where your application runs.
--Alternatively, you can create your own DynamoDB table with Primary Key set to leaseKey :: String
.
――We use this method because we manage AWS resources with Terraform.
--In this case, the table name will be the Kinesis application name as it is.
Those with ()
are methods, and only the important parts are extracted and described.
Consumer
is the part that you implement by creating a Worker instance and callingrun ()
.
The loop in which runProcessLoop ()
is executed is designed to loop infinitely until the shutdown is requested for the running Worker instance and the shutdown is completed.
In other words
--When the Worker's run ()
method is executed in the main thread of the consumer application, it basically continues to operate semi-permanently like a daemon.
――On the other hand, in order to operate in a batch style with regular start / end, which is the purpose of this time, it is necessary to create a subthread and pass the Worker instance to it. In addition, the worker can be terminated by sending a shutdown request from the main thread to the worker after a certain period of time.
The following is the process after the Worker in the above sequence diagram creates a ShardConsumer instance.
In addition, this process is executed in the loopof
[Execute in shard unit] in the above sequence diagram.
All exceptions that occur within the so-called record processor that implements the ʻIRecordProcessor interface are crushed. In other words, no matter what exception occurs in the record processor, the application will not crash. This negative effect occurs when recording checkpoints. As you can see from the sequence diagram, each method of ʻIRecordProcessor
is executed on the subthread.
Also, exceptions that occur within that thread are ignored by the KCL, as mentioned earlier.
In other words, even if an exception occurs in processRecords ()
, the thread will be treated as if it ended normally.
By the way, the checkpoint is determined to be the latest sequence of shard leases when it is passed to processRecords ()
.
This sequence is also passed at shutdown ()
, so if you record a checkpoint at shutdown ()
when the thread in processRecords ()
terminates abnormally, the stream that should have failed will be lost. I will.
It is different from the actual implementation, but it is written so that only the basic form of logic is transmitted. (Since I wrote it while extracting it, it may not be possible to execute it as it is.)
StreamsAdapter
--Entry point class --Worker is executed in a sub thread for batch processing that involves execution / termination (valid for 5 minutes in the example) --Every 30 seconds, ShardReadWriteLock checks the processing status of each shard, and if even one Exception occurs (ShardProcessStatus is FAILURE), the shutdown is executed immediately.
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class StreamsAdapter {
private static final String tableName = "tableName";
private static final String appName = "appName";
private static final String workerId = "workerId";
private static final AWSCredentialsProvider credentialsProvider = new InstanceProfileCredentialsProvider(true);
public static void main(String... args) {
// RecordProcessFactory
S3Client s3Client = S3Client.builder().region(Region.AP_NORTHEAST_1).build();
ShardReadWriteLock shardReadWriteLock = new ShardReadWriteLock();
IRecordProcessorFactory factory = new RecordProcessorFactory(s3Client, shardReadWriteLock);
// DynamoDB
AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();
String streamArn = dynamoDB.describeTable(tableName).getTable().getLatestStreamArn();
// DynamoDB Streams
AmazonDynamoDBStreams streams = AmazonDynamoDBStreamsClientBuilder.standard()
.withRegion(Regions.AP_NORTHEAST_1).build();
AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streams);
// KCL Configuration
KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(appName, streamArn, credentialsProvider, workerId)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
// CloudWatch
AmazonCloudWatch cloudWatch = AmazonCloudWatchClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();
// KCL Worker
Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(factory, kclConfig, adapterClient, dynamoDB, cloudWatch);
System.out.println("Starting stream processing...");
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(worker);
try {
// Enable RecordProcessor for 5 minutes.
// Check for exception every 30 seconds.
boolean allSucceed = true;
for (int i = 0; i < 10; i++) {
Thread.sleep(30000);
if (shardReadWriteLock.existsFailure()) {
allSucceed = false;
break;
}
}
worker.startGracefulShutdown().get();
if (!allSucceed) {
throw new RuntimeException("Caught exception in processRecords().");
}
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
throw new RuntimeException("Failed to process record of DynamoDB Streams via KCL.");
} finally {
es.shutdownNow();
}
System.out.println("Completed stream processing");
}
}
RecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import software.amazon.awssdk.services.s3.S3Client;
public class RecordProcessorFactory implements IRecordProcessorFactory {
private S3Client s3Client;
private ShardReadWriteLock shardReadWriteLock;
public RecordProcessorFactory(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
this.s3Client = s3Client;
this.shardReadWriteLock = shardReadWriteLock;
}
@Override
public IRecordProcessor createProcessor() {
return new RecordProcessor(s3Client, shardReadWriteLock);
}
}
RecordProcessor
--For clarity, processRecords () just puts the received stream to S3. --Exception and RuntimeException generated by processRecords () are crushed by KCL, so the whole process is enclosed in Exception, and when an exception occurs, the status FAILURE is set in ShardReadWriteLock passed from the main thread. --Even if processRecords () fails once, it may be called from the second time onward, so if an Exception has occurred even once in the target shard, the thread should be terminated immediately so that processing cannot proceed any further. Is --Beginning of processRecords () --Since it is necessary to record the checkpoint at the time of shutdown, when an exception occurs in processRecords (), all the record information acquired at that time is output to the log. ――I'm assuming recovery by hand, but please tell me if there is a better way.
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.util.Objects;
import java.util.Optional;
public class RecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
private final String bucket = "bucket";
private final String objectKey = "objectKey";
private S3Client s3Client;
private ShardReadWriteLock shardReadWriteLock;
private String shardId;
public RecordProcessor(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
this.s3Client = s3Client;
this.shardReadWriteLock = shardReadWriteLock;
}
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.getShardId();
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
Optional<ShardReadWriteLock.ShardProcessStatus> status = shardReadWriteLock.read(shardId);
if (!Objects.isNull(status) && status.equals(ShardReadWriteLock.ShardProcessStatus.FAILURE)) {
throw new RuntimeException("Shard " + shardId + " have failed.");
}
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.INITIALIZE);
try {
for (Record record : processRecordsInput.getRecords()) {
if (record instanceof RecordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
.getInternalObject();
if (!streamRecord.getEventName().equals("INSERT")) {
continue;
}
s3Client.putObject(PutObjectRequest.builder().bucket(bucket).key(objectKey).build(),
RequestBody.fromString(String.valueOf(streamRecord)));
}
}
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.SUCCESS);
} catch (Exception ex) {
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.FAILURE);
processRecordsInput.getRecords().forEach(s -> System.err.println(((RecordAdapter) s).getInternalObject()));
ex.printStackTrace();
throw new RuntimeException("Caught exception in process on shard " + shardId);
}
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException ex) {
ex.printStackTrace();
}
}
}
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (InvalidStateException | ShutdownException ex) {
ex.printStackTrace();
}
}
}
ShardReadWriteLock
--Adopts ReadWriteLock pattern, which is a multi-threaded design pattern. --shardsProcessStatus is a Map where Key is shard ID and Value is ShardProcessStatus, and it is implemented so that Lock is applied when reading / writing this. --INITIALIZE: Granted at the start of processRecords () processing --SUCCESS: Granted when processRecords () processing ends normally --FAILURE: Granted when an exception occurs in the processing of processRecords ()
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ShardReadWriteLock {
public enum ShardProcessStatus {
INITIALIZE,
SUCCESS,
FAILURE;
}
private Map<String, ShardProcessStatus> shardsProcessStatus = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Optional<ShardProcessStatus> read(String shardId) {
readLock.lock();
try {
return Optional.ofNullable(shardsProcessStatus.get(shardId));
} finally {
readLock.unlock();
}
}
/**
*Check all Shards for FAILURE status
*/
public boolean existsFailure() {
readLock.lock();
try {
boolean exists = false;
for (Map.Entry<String, ShardProcessStatus> entry : shardsProcessStatus.entrySet()) {
if (entry.getValue().equals(ShardProcessStatus.FAILURE)) {
exists = true;
break;
}
}
return exists;
} finally {
readLock.unlock();
}
}
public void write(String shardId, ShardProcessStatus status) {
writeLock.lock();
shardsProcessStatus.put(shardId, status);
writeLock.unlock();
}
}
It was very difficult to process in batch style with KCL. By monitoring the processing status of each shard from an external thread, it is possible to detect an error and interrupt the processing. In order to know how processRecords () is called and how exceptions are handled, I have summarized the processing of KCL in a sequence diagram. We hope that it will be helpful for those who run consumer applications as daemons and those who use Kinesis Data Streams. The problem is that when an exception occurs in the process of processRecords (), the record is output to the log and operated manually.
-Completed Program: DynamoDB Stream Kinesis Adapter
2020/09/15
The source code of Implementation of consumer application based on KCL specifications
has been modified.
The implementation was such that Read Lock of the ShardReadWriteLock
class was not possible.
We've also made some changes to the StreamsAdapter
and RecordProcessor
classes that call the read ()
method of the same class.