[AWS x Java] Dynamo DB-Streams mit Kinesis Client Library (KCL) verarbeiten

Einführung

Ich hatte die Möglichkeit, DynamoDB-Streams mit der Kinesis Client Library (KCL) in einem tatsächlichen Projekt zu verarbeiten. Daher möchte ich vorstellen, wie sich KCL tatsächlich verhält und wie es darauf basierend implementiert wurde.

3c1c3976-0a28-5cb8-3eab-b1ec4ac37f5c.png

Was ich diesmal machen wollte

Einige Funktionen von BookLive! verwenden DynamoDB. Es wurde notwendig, diese DynamoDB-Daten so bald wie möglich nach dem Hinzufügen der Daten an Redshift zu übertragen. Als Ergebnis verschiedener Studien, die auf dem aktuellen Tabellenformat basieren, kamen wir zu dem Schluss, dass es am besten ist, DynamoDB-Streams festzulegen und die hinzuzufügenden Daten als sogenannten Stream zu behandeln, um sie zu Redshift zu bringen. Darüber hinaus sollte die Methode zum Erfassen von DynamoDB-Streams in der offiziell empfohlenen Verwendung von KCL erfolgen. Ich habe es gemacht.

5b0cb9a9-cadf-5765-7a79-1c5cdb93d307.png

wichtiger Punkt

Verwandte Technologien in diesem Artikel

build.gradle


dependencies {
    implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.11.833'
    implementation 'com.amazonaws:aws-java-sdk-kinesis:1.11.839'
}

build.gradle


dependencies {
    implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.2'
}

DynamoDB-Streams und KCL

Informationen zu DynamoDB-Streams

Charakteristisch

Bild

Es puffert Streams, als wären sie Kinesis-Datenströme (Sie können sich den Kinesis-Shards-Teil in der folgenden Abbildung als DynamoDB-Streams vorstellen). Sie können KCL nicht direkt verwenden, sondern DynamoDB über den Kinesis-Adapter (https://github.com/awslabs/dynamodb-streams-kinesis-adapter) auf dieselbe Weise wie das Abrufen eines Streams aus Kinesis-Datenströmen. Es ist möglich, Streams zu erhalten. Bitte beachten Sie, dass die folgende Abbildung nur ein persönliches Bild ist.

8909fee9-e83c-cebf-1051-e78979697d23.png

Informationen zur Kinesis Client Library (KCL)

Was ist Kinesis überhaupt?

Ich werde es hier nicht erwähnen, aber ich werde einen Referenzartikel veröffentlichen.

Charakteristisch

--Öffnen Sie die Quellbibliothek für Verbraucheranwendungen, um Kinesis-Datenströme zu verarbeiten

KCL-Terminologie

Der folgende Inhalt gilt auch für DynamoDB-Streams (Kinesis-Datenströme können als DynamoDB-Streams gelesen werden).

--Scherbe

KCL-Logik (vereinfachte Version)

Diejenigen mit "()" sind Methoden, und nur wichtige Teile werden extrahiert und beschrieben. "Consumer" ist der Teil, den Sie implementieren, indem Sie eine Worker-Instanz erstellen und "run ()" aufrufen.

■ Bis der Datensatzprozessor generiert wird

fLLTJnD157tVNz745tqWCJuQGsWga8qD6jh4HEF3savmGvanCrCDFZ4x4L6M50qKZDLA97me50mXn0ZwOoxJyZEytMvnsj1W8m_BjEVUstFEFTCE2UbnMPmgE2b6HN6AyCJ0OE8-uvDukfdWHNhrckF4hst4fxxqlOB_Wc01PP8as3C4pl67hLD6Uf8yIhXZRjPDfjq7xjYeunGOUvf84qyGK5iG787-0leOz6i8tZLgQut5.png

Dämon oder Charge

Die Schleife, in der runProcessLoop () ausgeführt wird, ist so konzipiert, dass sie eine unbegrenzte Schleife durchführt, bis das Herunterfahren für die laufende Worker-Instanz angefordert und das Herunterfahren abgeschlossen ist. Mit anderen Worten

--Wenn die run () `-Methode des Workers im Hauptthread der Consumer-Anwendung ausgeführt wird, arbeitet sie im Grunde genommen semipermanent wie ein Daemon weiter. ――Daher ist es erforderlich, einen Unterfaden zu erstellen und die Worker-Instanz an diesen zu übergeben, um in einem Stapelstil mit regulärem Start / Ende zu arbeiten, was der Zweck dieser Zeit ist. Sie können den Worker auch beenden, indem Sie nach einer bestimmten Zeit eine Anforderung zum Herunterfahren vom Hauptthread an den Worker senden.

■ Verarbeitung des Plattenprozessors selbst

Das Folgende ist der Prozess, nachdem der Worker im obigen Sequenzdiagramm eine ShardConsumer-Instanz erstellt hat. Dieser Prozess wird in der Schleife "[In Shard-Einheit ausführen]" im obigen Sequenzdiagramm ausgeführt.

rPMnRjGm6CTtFuMbBgbKcAiRggOXWKX74iMfAe7GAUG-GNH9Z6odL4peF35rQnSU0IR4WEZRM7qFx4k5pCM8jcHf9klpv-zd__z_AOTSb4pqRODEA5i1muyEC7bRicL0EzwtkZ33rR3SbBrTX3eg0DDJOAFEbqN9LsalAjh2AvoanI98dsVpi0WHidkTc3SW4ZWJPiG_A7yKY2Y5XftuSH4dJnTfiiZyF4oA1Ndcvq_8c1kU.png

Ausnahmen, einschließlich RuntimeException, werden von KCL unterdrückt

Alle Ausnahmen, die innerhalb des sogenannten Datensatzprozessors auftreten, der die Schnittstelle "IRecordProcessor" implementiert, werden zerstört. Mit anderen Worten, unabhängig davon, welche Ausnahme im Datensatzprozessor auftritt, stürzt die Anwendung nicht ab. Dieser negative Effekt tritt beim Aufzeichnen von Prüfpunkten auf. Wie Sie dem Sequenzdiagramm entnehmen können, wird jede Methode von "IRecordProcessor" auf dem Unterfaden ausgeführt. Außerdem werden Ausnahmen, die in diesem Thread auftreten, von KCL ignoriert, wie bereits erwähnt. Mit anderen Worten, selbst wenn eine Ausnahme in "processRecords ()" auftritt, wird der Thread so behandelt, als würde er normal enden. Übrigens, wenn der Checkpoint an "processRecords ()" übergeben wird, wird festgestellt, dass es sich um die neueste Sequenz des Shard-Lease handelt. Diese Sequenz wird auch bei "shutdown ()" übergeben. Wenn Sie also einen Prüfpunkt bei "shutdown ()" aufzeichnen, wenn der Thread von "processRecords ()" abnormal beendet wird, geht der Stream verloren, der fehlgeschlagen sein sollte. Ich werde.

Implementierung von Consumer-Anwendungen basierend auf KCL-Spezifikationen

Es unterscheidet sich von der tatsächlichen Implementierung, ist jedoch so geschrieben, dass nur die Grundform der Logik übertragen wird. (Da ich es beim Extrahieren geschrieben habe, ist es möglicherweise nicht möglich, es so auszuführen, wie es ist.)

StreamsAdapter

--Eintrittspunktklasse --Worker wird in einem Subthread für die Stapelverarbeitung ausgeführt, der die Ausführung / Beendigung umfasst (im Beispiel 5 Minuten gültig).

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StreamsAdapter {

    private static final String tableName = "tableName";
    private static final String appName = "appName";
    private static final String workerId = "workerId";
    private static final AWSCredentialsProvider credentialsProvider = new InstanceProfileCredentialsProvider(true);

    public static void main(String... args) {
        // RecordProcessFactory
        S3Client s3Client = S3Client.builder().region(Region.AP_NORTHEAST_1).build();
        ShardReadWriteLock shardReadWriteLock = new ShardReadWriteLock();
        IRecordProcessorFactory factory = new RecordProcessorFactory(s3Client, shardReadWriteLock);

        // DynamoDB
        AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();
        String streamArn = dynamoDB.describeTable(tableName).getTable().getLatestStreamArn();

        // DynamoDB Streams
        AmazonDynamoDBStreams streams = AmazonDynamoDBStreamsClientBuilder.standard()
                .withRegion(Regions.AP_NORTHEAST_1).build();
        AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streams);

        // KCL Configuration
        KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(appName, streamArn, credentialsProvider, workerId)
                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

        // CloudWatch
        AmazonCloudWatch cloudWatch = AmazonCloudWatchClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();

        // KCL Worker
        Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(factory, kclConfig, adapterClient, dynamoDB, cloudWatch);

        System.out.println("Starting stream processing...");
        ExecutorService es = Executors.newSingleThreadExecutor();
        es.execute(worker);
        try {
            // Enable RecordProcessor for 5 minutes.
            // Check for exception every 30 seconds.
            boolean allSucceed = true;
            for (int i = 0; i < 10; i++) {
                Thread.sleep(30000);
                if (shardReadWriteLock.existsFailure()) {
                    allSucceed = false;
                    break;
                }
            }
            worker.startGracefulShutdown().get();

            if (!allSucceed) {
                throw new RuntimeException("Caught exception in processRecords().");
            }
        } catch (InterruptedException | ExecutionException ex) {
            ex.printStackTrace();
            throw new RuntimeException("Failed to process record of DynamoDB Streams via KCL.");
        } finally {
            es.shutdownNow();
        }
        System.out.println("Completed stream processing");
    }
}

RecordProcessorFactory

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import software.amazon.awssdk.services.s3.S3Client;

public class RecordProcessorFactory implements IRecordProcessorFactory {

    private S3Client s3Client;
    private ShardReadWriteLock shardReadWriteLock;

    public RecordProcessorFactory(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
        this.s3Client = s3Client;
        this.shardReadWriteLock = shardReadWriteLock;
    }

    @Override
    public IRecordProcessor createProcessor() {
        return new RecordProcessor(s3Client, shardReadWriteLock);
    }
}

RecordProcessor

import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.util.Objects;
import java.util.Optional;

public class RecordProcessor implements IRecordProcessor, IShutdownNotificationAware {

    private final String bucket = "bucket";
    private final String objectKey = "objectKey";

    private S3Client s3Client;
    private ShardReadWriteLock shardReadWriteLock;
    private String shardId;

    public RecordProcessor(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
        this.s3Client = s3Client;
        this.shardReadWriteLock = shardReadWriteLock;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.getShardId();
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        Optional<ShardReadWriteLock.ShardProcessStatus> status = shardReadWriteLock.read(shardId);
        if (!Objects.isNull(status) && status.equals(ShardReadWriteLock.ShardProcessStatus.FAILURE)) {
            throw new RuntimeException("Shard " + shardId + " have failed.");
        }

        shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.INITIALIZE);

        try {
            for (Record record : processRecordsInput.getRecords()) {
                if (record instanceof RecordAdapter) {
                    com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
                            .getInternalObject();

                    if (!streamRecord.getEventName().equals("INSERT")) {
                        continue;
                    }

                    s3Client.putObject(PutObjectRequest.builder().bucket(bucket).key(objectKey).build(),
                            RequestBody.fromString(String.valueOf(streamRecord)));
                }
            }

            shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.SUCCESS);
        } catch (Exception ex) {
            shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.FAILURE);

            processRecordsInput.getRecords().forEach(s -> System.err.println(((RecordAdapter) s).getInternalObject()));

            ex.printStackTrace();
            throw new RuntimeException("Caught exception in process on shard " + shardId);
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (InvalidStateException | ShutdownException ex) {
                ex.printStackTrace();
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (InvalidStateException | ShutdownException ex) {
            ex.printStackTrace();
        }
    }
}

ShardReadWriteLock


import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ShardReadWriteLock {

    public enum ShardProcessStatus {

        INITIALIZE,
        SUCCESS,
        FAILURE;
    }

    private Map<String, ShardProcessStatus> shardsProcessStatus = new HashMap<>();

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    public Optional<ShardProcessStatus> read(String shardId) {
        readLock.lock();
        try {
            return Optional.ofNullable(shardsProcessStatus.get(shardId));
        } finally {
            readLock.unlock();
        }
    }

    /**
     *Überprüfen Sie alle Shards auf den Status FAILURE
     */
    public boolean existsFailure() {
        readLock.lock();
        try {
            boolean exists = false;
            for (Map.Entry<String, ShardProcessStatus> entry : shardsProcessStatus.entrySet()) {
                if (entry.getValue().equals(ShardProcessStatus.FAILURE)) {
                    exists = true;
                    break;
                }
            }
            return exists;
        } finally {
            readLock.unlock();
        }
    }

    public void write(String shardId, ShardProcessStatus status) {
        writeLock.lock();
        shardsProcessStatus.put(shardId, status);
        writeLock.unlock();
    }
}

Zusammenfassung

Es war sehr schwierig, mit KCL im Batch-Stil zu verarbeiten. Durch Überwachen des Verarbeitungsstatus jedes Shards von einem externen Thread aus ist es möglich, einen Fehler zu erkennen und die Verarbeitung zu unterbrechen. Um zu wissen, wie processRecords () aufgerufen wird und wie Ausnahmen behandelt werden, haben wir die KCL-Verarbeitung in einem Sequenzdiagramm zusammengefasst. Wir hoffen, dass dies für diejenigen hilfreich ist, die Consumer-Anwendungen als Daemons ausführen, und für diejenigen, die Kinesis-Datenströme verwenden. Das Problem ist, dass, wenn im Prozess von processRecords () eine Ausnahme auftritt, der Datensatz in das Protokoll ausgegeben und manuell ausgeführt wird.

Referenz

Veränderung

2020/09/15

ReadLock wurde nicht durchgeführt, ändern Sie also den Quellcode

Der Quellcode der "Implementierung der Verbraucheranwendung basierend auf den KCL-Spezifikationen" wurde geändert. Die Implementierung war so, dass Read Lock der ShardReadWriteLock-Klasse nicht möglich war. Wir haben auch Änderungen an den Klassen "StreamsAdapter" und "RecordProcessor" vorgenommen, die die Methode "read ()" derselben Klasse aufrufen.

Recommended Posts

[AWS x Java] Dynamo DB-Streams mit Kinesis Client Library (KCL) verarbeiten
[Java EE] Implementieren Sie den Client mit WebSocket
AWS SDK für Java 1.11.x und 2.x.
Verwenden von Java mit AWS Lambda-Eclipse-Vorbereitung