Ich hatte die Möglichkeit, DynamoDB-Streams mit der Kinesis Client Library (KCL) in einem tatsächlichen Projekt zu verarbeiten. Daher möchte ich vorstellen, wie sich KCL tatsächlich verhält und wie es darauf basierend implementiert wurde.
Einige Funktionen von BookLive! verwenden DynamoDB. Es wurde notwendig, diese DynamoDB-Daten so bald wie möglich nach dem Hinzufügen der Daten an Redshift zu übertragen. Als Ergebnis verschiedener Studien, die auf dem aktuellen Tabellenformat basieren, kamen wir zu dem Schluss, dass es am besten ist, DynamoDB-Streams festzulegen und die hinzuzufügenden Daten als sogenannten Stream zu behandeln, um sie zu Redshift zu bringen. Darüber hinaus sollte die Methode zum Erfassen von DynamoDB-Streams in der offiziell empfohlenen Verwendung von KCL erfolgen. Ich habe es gemacht.
build.gradle
dependencies {
implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.11.833'
implementation 'com.amazonaws:aws-java-sdk-kinesis:1.11.839'
}
build.gradle
dependencies {
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.2'
}
Es puffert Streams, als wären sie Kinesis-Datenströme (Sie können sich den Kinesis-Shards-Teil in der folgenden Abbildung als DynamoDB-Streams vorstellen). Sie können KCL nicht direkt verwenden, sondern DynamoDB über den Kinesis-Adapter (https://github.com/awslabs/dynamodb-streams-kinesis-adapter) auf dieselbe Weise wie das Abrufen eines Streams aus Kinesis-Datenströmen. Es ist möglich, Streams zu erhalten. Bitte beachten Sie, dass die folgende Abbildung nur ein persönliches Bild ist.
Ich werde es hier nicht erwähnen, aber ich werde einen Referenzartikel veröffentlichen.
--Öffnen Sie die Quellbibliothek für Verbraucheranwendungen, um Kinesis-Datenströme zu verarbeiten
Der folgende Inhalt gilt auch für DynamoDB-Streams (Kinesis-Datenströme können als DynamoDB-Streams gelesen werden).
--Scherbe
Diejenigen mit "()" sind Methoden, und nur wichtige Teile werden extrahiert und beschrieben. "Consumer" ist der Teil, den Sie implementieren, indem Sie eine Worker-Instanz erstellen und "run ()" aufrufen.
Die Schleife, in der runProcessLoop ()
ausgeführt wird, ist so konzipiert, dass sie eine unbegrenzte Schleife durchführt, bis das Herunterfahren für die laufende Worker-Instanz angefordert und das Herunterfahren abgeschlossen ist.
Mit anderen Worten
--Wenn die run () `-Methode des Workers im Hauptthread der Consumer-Anwendung ausgeführt wird, arbeitet sie im Grunde genommen semipermanent wie ein Daemon weiter. ――Daher ist es erforderlich, einen Unterfaden zu erstellen und die Worker-Instanz an diesen zu übergeben, um in einem Stapelstil mit regulärem Start / Ende zu arbeiten, was der Zweck dieser Zeit ist. Sie können den Worker auch beenden, indem Sie nach einer bestimmten Zeit eine Anforderung zum Herunterfahren vom Hauptthread an den Worker senden.
Das Folgende ist der Prozess, nachdem der Worker im obigen Sequenzdiagramm eine ShardConsumer-Instanz erstellt hat. Dieser Prozess wird in der Schleife "[In Shard-Einheit ausführen]" im obigen Sequenzdiagramm ausgeführt.
Alle Ausnahmen, die innerhalb des sogenannten Datensatzprozessors auftreten, der die Schnittstelle "IRecordProcessor" implementiert, werden zerstört. Mit anderen Worten, unabhängig davon, welche Ausnahme im Datensatzprozessor auftritt, stürzt die Anwendung nicht ab. Dieser negative Effekt tritt beim Aufzeichnen von Prüfpunkten auf. Wie Sie dem Sequenzdiagramm entnehmen können, wird jede Methode von "IRecordProcessor" auf dem Unterfaden ausgeführt. Außerdem werden Ausnahmen, die in diesem Thread auftreten, von KCL ignoriert, wie bereits erwähnt. Mit anderen Worten, selbst wenn eine Ausnahme in "processRecords ()" auftritt, wird der Thread so behandelt, als würde er normal enden. Übrigens, wenn der Checkpoint an "processRecords ()" übergeben wird, wird festgestellt, dass es sich um die neueste Sequenz des Shard-Lease handelt. Diese Sequenz wird auch bei "shutdown ()" übergeben. Wenn Sie also einen Prüfpunkt bei "shutdown ()" aufzeichnen, wenn der Thread von "processRecords ()" abnormal beendet wird, geht der Stream verloren, der fehlgeschlagen sein sollte. Ich werde.
Es unterscheidet sich von der tatsächlichen Implementierung, ist jedoch so geschrieben, dass nur die Grundform der Logik übertragen wird. (Da ich es beim Extrahieren geschrieben habe, ist es möglicherweise nicht möglich, es so auszuführen, wie es ist.)
StreamsAdapter
--Eintrittspunktklasse --Worker wird in einem Subthread für die Stapelverarbeitung ausgeführt, der die Ausführung / Beendigung umfasst (im Beispiel 5 Minuten gültig).
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class StreamsAdapter {
private static final String tableName = "tableName";
private static final String appName = "appName";
private static final String workerId = "workerId";
private static final AWSCredentialsProvider credentialsProvider = new InstanceProfileCredentialsProvider(true);
public static void main(String... args) {
// RecordProcessFactory
S3Client s3Client = S3Client.builder().region(Region.AP_NORTHEAST_1).build();
ShardReadWriteLock shardReadWriteLock = new ShardReadWriteLock();
IRecordProcessorFactory factory = new RecordProcessorFactory(s3Client, shardReadWriteLock);
// DynamoDB
AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();
String streamArn = dynamoDB.describeTable(tableName).getTable().getLatestStreamArn();
// DynamoDB Streams
AmazonDynamoDBStreams streams = AmazonDynamoDBStreamsClientBuilder.standard()
.withRegion(Regions.AP_NORTHEAST_1).build();
AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streams);
// KCL Configuration
KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(appName, streamArn, credentialsProvider, workerId)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
// CloudWatch
AmazonCloudWatch cloudWatch = AmazonCloudWatchClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();
// KCL Worker
Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(factory, kclConfig, adapterClient, dynamoDB, cloudWatch);
System.out.println("Starting stream processing...");
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(worker);
try {
// Enable RecordProcessor for 5 minutes.
// Check for exception every 30 seconds.
boolean allSucceed = true;
for (int i = 0; i < 10; i++) {
Thread.sleep(30000);
if (shardReadWriteLock.existsFailure()) {
allSucceed = false;
break;
}
}
worker.startGracefulShutdown().get();
if (!allSucceed) {
throw new RuntimeException("Caught exception in processRecords().");
}
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
throw new RuntimeException("Failed to process record of DynamoDB Streams via KCL.");
} finally {
es.shutdownNow();
}
System.out.println("Completed stream processing");
}
}
RecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import software.amazon.awssdk.services.s3.S3Client;
public class RecordProcessorFactory implements IRecordProcessorFactory {
private S3Client s3Client;
private ShardReadWriteLock shardReadWriteLock;
public RecordProcessorFactory(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
this.s3Client = s3Client;
this.shardReadWriteLock = shardReadWriteLock;
}
@Override
public IRecordProcessor createProcessor() {
return new RecordProcessor(s3Client, shardReadWriteLock);
}
}
RecordProcessor
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.util.Objects;
import java.util.Optional;
public class RecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
private final String bucket = "bucket";
private final String objectKey = "objectKey";
private S3Client s3Client;
private ShardReadWriteLock shardReadWriteLock;
private String shardId;
public RecordProcessor(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
this.s3Client = s3Client;
this.shardReadWriteLock = shardReadWriteLock;
}
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.getShardId();
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
Optional<ShardReadWriteLock.ShardProcessStatus> status = shardReadWriteLock.read(shardId);
if (!Objects.isNull(status) && status.equals(ShardReadWriteLock.ShardProcessStatus.FAILURE)) {
throw new RuntimeException("Shard " + shardId + " have failed.");
}
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.INITIALIZE);
try {
for (Record record : processRecordsInput.getRecords()) {
if (record instanceof RecordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
.getInternalObject();
if (!streamRecord.getEventName().equals("INSERT")) {
continue;
}
s3Client.putObject(PutObjectRequest.builder().bucket(bucket).key(objectKey).build(),
RequestBody.fromString(String.valueOf(streamRecord)));
}
}
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.SUCCESS);
} catch (Exception ex) {
shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.FAILURE);
processRecordsInput.getRecords().forEach(s -> System.err.println(((RecordAdapter) s).getInternalObject()));
ex.printStackTrace();
throw new RuntimeException("Caught exception in process on shard " + shardId);
}
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException ex) {
ex.printStackTrace();
}
}
}
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (InvalidStateException | ShutdownException ex) {
ex.printStackTrace();
}
}
}
ShardReadWriteLock
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ShardReadWriteLock {
public enum ShardProcessStatus {
INITIALIZE,
SUCCESS,
FAILURE;
}
private Map<String, ShardProcessStatus> shardsProcessStatus = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Optional<ShardProcessStatus> read(String shardId) {
readLock.lock();
try {
return Optional.ofNullable(shardsProcessStatus.get(shardId));
} finally {
readLock.unlock();
}
}
/**
*Überprüfen Sie alle Shards auf den Status FAILURE
*/
public boolean existsFailure() {
readLock.lock();
try {
boolean exists = false;
for (Map.Entry<String, ShardProcessStatus> entry : shardsProcessStatus.entrySet()) {
if (entry.getValue().equals(ShardProcessStatus.FAILURE)) {
exists = true;
break;
}
}
return exists;
} finally {
readLock.unlock();
}
}
public void write(String shardId, ShardProcessStatus status) {
writeLock.lock();
shardsProcessStatus.put(shardId, status);
writeLock.unlock();
}
}
Es war sehr schwierig, mit KCL im Batch-Stil zu verarbeiten. Durch Überwachen des Verarbeitungsstatus jedes Shards von einem externen Thread aus ist es möglich, einen Fehler zu erkennen und die Verarbeitung zu unterbrechen. Um zu wissen, wie processRecords () aufgerufen wird und wie Ausnahmen behandelt werden, haben wir die KCL-Verarbeitung in einem Sequenzdiagramm zusammengefasst. Wir hoffen, dass dies für diejenigen hilfreich ist, die Consumer-Anwendungen als Daemons ausführen, und für diejenigen, die Kinesis-Datenströme verwenden. Das Problem ist, dass, wenn im Prozess von processRecords () eine Ausnahme auftritt, der Datensatz in das Protokoll ausgegeben und manuell ausgeführt wird.
2020/09/15
Der Quellcode der "Implementierung der Verbraucheranwendung basierend auf den KCL-Spezifikationen" wurde geändert. Die Implementierung war so, dass Read Lock der ShardReadWriteLock-Klasse nicht möglich war. Wir haben auch Änderungen an den Klassen "StreamsAdapter" und "RecordProcessor" vorgenommen, die die Methode "read ()" derselben Klasse aufrufen.