J'ai eu l'occasion de traiter les flux DynamoDB avec la bibliothèque cliente Kinesis (KCL) dans un projet réel, je voudrais donc présenter le comportement réel de KCL et comment il a été implémenté en fonction de cela.
Certaines fonctionnalités de BookLive! utilisent DynamoDB. Il est devenu nécessaire de transférer ces données DynamoDB vers Redshift dès que possible après avoir ajouté les données. À la suite de diverses études basées sur le format de table actuel, nous sommes arrivés à la conclusion qu'il est préférable de définir les flux DynamoDB et de gérer les données à ajouter en tant que soi-disant flux pour les amener à Redshift. En outre, la méthode d'acquisition des flux DynamoDB doit être effectuée dans le [Comment utiliser KCL] officiellement recommandé (https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html). Je l'ai fait.
build.gradle
dependencies {
implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.11.833'
implementation 'com.amazonaws:aws-java-sdk-kinesis:1.11.839'
}
build.gradle
dependencies {
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.2'
}
Il met en mémoire tampon les flux comme s'il s'agissait de flux de données Kinesis (vous pouvez considérer la partie Kinesis Shards dans la figure ci-dessous comme des flux DynamoDB). Vous ne pouvez pas utiliser KCL directement, mais DynamoDB via l'adaptateur Kinesis (https://github.com/awslabs/dynamodb-streams-kinesis-adapter) de la même manière que pour obtenir un flux à partir de Kinesis Data Streams. Il est possible d'obtenir des flux. Veuillez noter que la figure ci-dessous n'est qu'une image personnelle.
Je ne le mentionnerai pas ici, mais je publierai un article de référence.
Le contenu suivant est synonyme de flux DynamoDB (les flux de données Kinesis peuvent être lus en tant que flux DynamoDB).
--Tesson
bailKey :: String
.
«Nous utilisons cette méthode car nous gérons les ressources AWS avec Terraform.Celles avec ()
sont des méthodes, et seules les parties importantes sont extraites et décrites.
Consumer
est la partie que vous implémentez en créant une instance Worker et en appelant run ()
.
La boucle dans laquelle runProcessLoop ()
est exécuté est conçue pour boucler indéfiniment jusqu'à ce que l'arrêt soit demandé pour l'instance Worker en cours d'exécution et que l'arrêt soit terminé.
En d'autres termes
--Lorsque la méthode run ()
du Worker est exécutée dans le thread principal de l'application consommateur, elle continue essentiellement à fonctionner de manière semi-permanente comme un démon.
―― D'autre part, pour fonctionner dans un style batch avec début / fin régulier, ce qui est le but de cette fois, il est nécessaire de créer un sous-thread et de lui passer l'instance Worker. Vous pouvez également mettre fin au Worker en envoyant une demande d'arrêt du thread principal au Worker après un certain laps de temps.
Ce qui suit est le processus après que le Worker dans le diagramme de séquence ci-dessus a créé une instance ShardConsumer. Ce processus est exécuté dans la «boucle [Execute in shard unit]» du diagramme de séquence ci-dessus.
Toutes les exceptions qui se produisent dans le soi-disant processeur d'enregistrement qui implémente l'interface ʻIRecordProcessor sont écrasées. En d'autres termes, l'application ne plantera pas, quelle que soit l'exception qui se produit dans le processeur d'enregistrement. Cet effet négatif se produit lors de l'enregistrement des points de contrôle. Comme vous pouvez le voir sur le diagramme de séquence, chaque méthode de ʻIRecordProcessor
est exécutée sur le sous-thread.
En outre, les exceptions qui se produisent dans ce thread sont ignorées par KCL, comme mentionné précédemment.
En d'autres termes, même si une exception se produit dans processRecords ()
, le thread sera traité comme s'il se terminait normalement.
En passant, lorsque le point de contrôle est passé à processRecords ()
, il est déterminé qu'il s'agit de la dernière séquence du bail de partition.
Cette séquence est également passée à shutdown ()
, donc si vous enregistrez un point de contrôle à shutdown ()
lorsque le thread deprocessRecords ()
se termine anormalement, le flux qui aurait dû échouer sera perdu. Je vais.
Elle est différente de l'implémentation réelle, mais elle est écrite de manière à ce que seule la forme de base de la logique soit transmise. (Puisque je l'ai écrit lors de son extraction, il peut ne pas être possible de l'exécuter tel quel.)
StreamsAdapter
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class StreamsAdapter {
private static final String tableName = "tableName";
private static final String appName = "appName";
private static final String workerId = "workerId";
private static final AWSCredentialsProvider credentialsProvider = new InstanceProfileCredentialsProvider(true);
public static void main(String... args) {
// RecordProcessFactory
S3Client s3Client = S3Client.builder().region(Region.AP_NORTHEAST_1).build();
ShardReadWriteLock shardReadWriteLock = new ShardReadWriteLock();
IRecordProcessorFactory factory = new RecordProcessorFactory(s3Client, shardReadWriteLock);
// DynamoDB
AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();
String streamArn = dynamoDB.describeTable(tableName).getTable().getLatestStreamArn();
// DynamoDB Streams
AmazonDynamoDBStreams streams = AmazonDynamoDBStreamsClientBuilder.standard()
.withRegion(Regions.AP_NORTHEAST_1).build();
AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streams);
// KCL Configuration
KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(appName, streamArn, credentialsProvider, workerId)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
// CloudWatch
AmazonCloudWatch cloudWatch = AmazonCloudWatchClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();
// KCL Worker
Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(factory, kclConfig, adapterClient, dynamoDB, cloudWatch);
System.out.println("Starting stream processing...");
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(worker);
try {
// Enable RecordProcessor for 5 minutes.
// Check for exception every 30 seconds.
boolean allSucceed = true;
for (int i = 0; i < 10; i++) {
Thread.sleep(30000);
if (shardReadWriteLock.existsFailure()) {
allSucceed = false;
break;
}
}
worker.startGracefulShutdown().get();
if (!allSucceed) {
throw new RuntimeException("Caught exception in processRecords().");
}
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
throw new RuntimeException("Failed to process record of DynamoDB Streams via KCL.");
} finally {
es.shutdownNow();
}
System.out.println("Completed stream processing");
}
}
RecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import software.amazon.awssdk.services.s3.S3Client;
public class RecordProcessorFactory implements IRecordProcessorFactory {
private S3Client s3Client;
private ShardReadWriteLock shardReadWriteLock;
public RecordProcessorFactory(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
this.s3Client = s3Client;
this.shardReadWriteLock = shardReadWriteLock;
}
@Override
public IRecordProcessor createProcessor() {
return new RecordProcessor(s3Client, shardReadWriteLock);
}
}
RecordProcessor
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.util.Objects;
import java.util.Optional;
public class RecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
private final String bucket = "bucket";
private final String objectKey = "objectKey";
private S3Client s3Client;
private ShardReadWriteLock shardReadWriteLock;
private String shardId;
public RecordProcessor(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
this.s3Client = s3Client;
this.shardReadWriteLock = shardReadWriteLock;
}
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.getShardId();
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
Optional<ShardReadWriteLock.ShardProcessStatus> status = shardReadWriteLock.read(shardId);
if (!Objects.isNull(status) && status.equals(ShardReadWriteLock.ShardProcessStatus.FAILURE)) {
throw new RuntimeException("Shard " + shardId + " have failed.");
}
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.INITIALIZE);
try {
for (Record record : processRecordsInput.getRecords()) {
if (record instanceof RecordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
.getInternalObject();
if (!streamRecord.getEventName().equals("INSERT")) {
continue;
}
s3Client.putObject(PutObjectRequest.builder().bucket(bucket).key(objectKey).build(),
RequestBody.fromString(String.valueOf(streamRecord)));
}
}
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.SUCCESS);
} catch (Exception ex) {
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.FAILURE);
processRecordsInput.getRecords().forEach(s -> System.err.println(((RecordAdapter) s).getInternalObject()));
ex.printStackTrace();
throw new RuntimeException("Caught exception in process on shard " + shardId);
}
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException ex) {
ex.printStackTrace();
}
}
}
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (InvalidStateException | ShutdownException ex) {
ex.printStackTrace();
}
}
}
ShardReadWriteLock
--Adopte le modèle ReadWriteLock, qui est un modèle de conception multi-thread. --shardsProcessStatus est une carte où Key est l'ID de partition et la valeur est ShardProcessStatus, et il est implémenté de sorte que Lock est appliqué lors de la lecture / écriture de ceci. --INITIALIZE: Accordé au début du traitement processRecords () --SUCCESS: accordé lorsque le traitement processRecords () se termine normalement --FAILURE: accordé lorsqu'une exception se produit dans le traitement de processRecords ()
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ShardReadWriteLock {
public enum ShardProcessStatus {
INITIALIZE,
SUCCESS,
FAILURE;
}
private Map<String, ShardProcessStatus> shardsProcessStatus = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Optional<ShardProcessStatus> read(String shardId) {
readLock.lock();
try {
return Optional.ofNullable(shardsProcessStatus.get(shardId));
} finally {
readLock.unlock();
}
}
/**
*Vérifiez tous les fragments pour le statut FAILURE
*/
public boolean existsFailure() {
readLock.lock();
try {
boolean exists = false;
for (Map.Entry<String, ShardProcessStatus> entry : shardsProcessStatus.entrySet()) {
if (entry.getValue().equals(ShardProcessStatus.FAILURE)) {
exists = true;
break;
}
}
return exists;
} finally {
readLock.unlock();
}
}
public void write(String shardId, ShardProcessStatus status) {
writeLock.lock();
shardsProcessStatus.put(shardId, status);
writeLock.unlock();
}
}
Il était très difficile de traiter par lots avec KCL. En surveillant l'état de traitement de chaque partition à partir d'un thread externe, il est possible de détecter une erreur et d'interrompre le traitement. Afin de savoir comment processRecords () est appelé et comment les exceptions sont gérées, nous avons résumé le traitement KCL dans un diagramme de séquence. Nous espérons qu'il sera utile à ceux qui exécutent des applications grand public en tant que démons et à ceux qui utilisent Kinesis Data Streams. Le problème est que lorsqu'une exception se produit dans le processus de processRecords (), l'enregistrement est sorti dans le journal et géré manuellement.
2020/09/15
Modification du code source de «l'implémentation d'une application grand public basée sur les spécifications KCL».
L'implémentation était telle que Read Lock de la classe ShardReadWriteLock
n'était pas possible.
Nous avons également apporté des modifications aux classes StreamsAdapter
et RecordProcessor
qui appellent la méthoderead ()
de la même classe.