[AWS x Java] Traiter les flux de base de données Dynamo avec la bibliothèque cliente Kinesis (KCL)

introduction

J'ai eu l'occasion de traiter les flux DynamoDB avec la bibliothèque cliente Kinesis (KCL) dans un projet réel, je voudrais donc présenter le comportement réel de KCL et comment il a été implémenté en fonction de cela.

3c1c3976-0a28-5cb8-3eab-b1ec4ac37f5c.png

Ce que je voulais faire cette fois

Certaines fonctionnalités de BookLive! utilisent DynamoDB. Il est devenu nécessaire de transférer ces données DynamoDB vers Redshift dès que possible après avoir ajouté les données. À la suite de diverses études basées sur le format de table actuel, nous sommes arrivés à la conclusion qu'il est préférable de définir les flux DynamoDB et de gérer les données à ajouter en tant que soi-disant flux pour les amener à Redshift. En outre, la méthode d'acquisition des flux DynamoDB doit être effectuée dans le [Comment utiliser KCL] officiellement recommandé (https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html). Je l'ai fait.

5b0cb9a9-cadf-5765-7a79-1c5cdb93d307.png

point important

Technologies associées dans cet article

build.gradle


dependencies {
    implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.11.833'
    implementation 'com.amazonaws:aws-java-sdk-kinesis:1.11.839'
}

build.gradle


dependencies {
    implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.2'
}

Flux DynamoDB et KCL

À propos des flux DynamoDB

Fonctionnalité

image

Il met en mémoire tampon les flux comme s'il s'agissait de flux de données Kinesis (vous pouvez considérer la partie Kinesis Shards dans la figure ci-dessous comme des flux DynamoDB). Vous ne pouvez pas utiliser KCL directement, mais DynamoDB via l'adaptateur Kinesis (https://github.com/awslabs/dynamodb-streams-kinesis-adapter) de la même manière que pour obtenir un flux à partir de Kinesis Data Streams. Il est possible d'obtenir des flux. Veuillez noter que la figure ci-dessous n'est qu'une image personnelle.

8909fee9-e83c-cebf-1051-e78979697d23.png

À propos de la bibliothèque cliente Kinesis (KCL)

Qu'est-ce que Kinesis en premier lieu?

Je ne le mentionnerai pas ici, mais je publierai un article de référence.

Fonctionnalité

Terminologie KCL

Le contenu suivant est synonyme de flux DynamoDB (les flux de données Kinesis peuvent être lus en tant que flux DynamoDB).

--Tesson

Logique KCL (version simplifiée)

Celles avec () sont des méthodes, et seules les parties importantes sont extraites et décrites. Consumer est la partie que vous implémentez en créant une instance Worker et en appelant run ().

■ Jusqu'à ce que le processeur d'enregistrement soit généré

fLLTJnD157tVNz745tqWCJuQGsWga8qD6jh4HEF3savmGvanCrCDFZ4x4L6M50qKZDLA97me50mXn0ZwOoxJyZEytMvnsj1W8m_BjEVUstFEFTCE2UbnMPmgE2b6HN6AyCJ0OE8-uvDukfdWHNhrckF4hst4fxxqlOB_Wc01PP8as3C4pl67hLD6Uf8yIhXZRjPDfjq7xjYeunGOUvf84qyGK5iG787-0leOz6i8tZLgQut5.png

Démon ou lot

La boucle dans laquelle runProcessLoop () est exécuté est conçue pour boucler indéfiniment jusqu'à ce que l'arrêt soit demandé pour l'instance Worker en cours d'exécution et que l'arrêt soit terminé. En d'autres termes

--Lorsque la méthode run () du Worker est exécutée dans le thread principal de l'application consommateur, elle continue essentiellement à fonctionner de manière semi-permanente comme un démon. ―― D'autre part, pour fonctionner dans un style batch avec début / fin régulier, ce qui est le but de cette fois, il est nécessaire de créer un sous-thread et de lui passer l'instance Worker. Vous pouvez également mettre fin au Worker en envoyant une demande d'arrêt du thread principal au Worker après un certain laps de temps.

■ Traitement du processeur d'enregistrement lui-même

Ce qui suit est le processus après que le Worker dans le diagramme de séquence ci-dessus a créé une instance ShardConsumer. Ce processus est exécuté dans la «boucle [Execute in shard unit]» du diagramme de séquence ci-dessus.

rPMnRjGm6CTtFuMbBgbKcAiRggOXWKX74iMfAe7GAUG-GNH9Z6odL4peF35rQnSU0IR4WEZRM7qFx4k5pCM8jcHf9klpv-zd__z_AOTSb4pqRODEA5i1muyEC7bRicL0EzwtkZ33rR3SbBrTX3eg0DDJOAFEbqN9LsalAjh2AvoanI98dsVpi0WHidkTc3SW4ZWJPiG_A7yKY2Y5XftuSH4dJnTfiiZyF4oA1Ndcvq_8c1kU.png

Les exceptions, y compris RuntimeException, sont écrasées par KCL

Toutes les exceptions qui se produisent dans le soi-disant processeur d'enregistrement qui implémente l'interface ʻIRecordProcessor sont écrasées. En d'autres termes, l'application ne plantera pas, quelle que soit l'exception qui se produit dans le processeur d'enregistrement. Cet effet négatif se produit lors de l'enregistrement des points de contrôle. Comme vous pouvez le voir sur le diagramme de séquence, chaque méthode de ʻIRecordProcessor est exécutée sur le sous-thread. En outre, les exceptions qui se produisent dans ce thread sont ignorées par KCL, comme mentionné précédemment. En d'autres termes, même si une exception se produit dans processRecords (), le thread sera traité comme s'il se terminait normalement. En passant, lorsque le point de contrôle est passé à processRecords (), il est déterminé qu'il s'agit de la dernière séquence du bail de partition. Cette séquence est également passée à shutdown (), donc si vous enregistrez un point de contrôle à shutdown () lorsque le thread deprocessRecords ()se termine anormalement, le flux qui aurait dû échouer sera perdu. Je vais.

Implémentation d'applications grand public basées sur les spécifications KCL

Elle est différente de l'implémentation réelle, mais elle est écrite de manière à ce que seule la forme de base de la logique soit transmise. (Puisque je l'ai écrit lors de son extraction, il peut ne pas être possible de l'exécuter tel quel.)

StreamsAdapter

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StreamsAdapter {

    private static final String tableName = "tableName";
    private static final String appName = "appName";
    private static final String workerId = "workerId";
    private static final AWSCredentialsProvider credentialsProvider = new InstanceProfileCredentialsProvider(true);

    public static void main(String... args) {
        // RecordProcessFactory
        S3Client s3Client = S3Client.builder().region(Region.AP_NORTHEAST_1).build();
        ShardReadWriteLock shardReadWriteLock = new ShardReadWriteLock();
        IRecordProcessorFactory factory = new RecordProcessorFactory(s3Client, shardReadWriteLock);

        // DynamoDB
        AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();
        String streamArn = dynamoDB.describeTable(tableName).getTable().getLatestStreamArn();

        // DynamoDB Streams
        AmazonDynamoDBStreams streams = AmazonDynamoDBStreamsClientBuilder.standard()
                .withRegion(Regions.AP_NORTHEAST_1).build();
        AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streams);

        // KCL Configuration
        KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(appName, streamArn, credentialsProvider, workerId)
                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

        // CloudWatch
        AmazonCloudWatch cloudWatch = AmazonCloudWatchClientBuilder.standard().withRegion(Regions.AP_NORTHEAST_1).build();

        // KCL Worker
        Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(factory, kclConfig, adapterClient, dynamoDB, cloudWatch);

        System.out.println("Starting stream processing...");
        ExecutorService es = Executors.newSingleThreadExecutor();
        es.execute(worker);
        try {
            // Enable RecordProcessor for 5 minutes.
            // Check for exception every 30 seconds.
            boolean allSucceed = true;
            for (int i = 0; i < 10; i++) {
                Thread.sleep(30000);
                if (shardReadWriteLock.existsFailure()) {
                    allSucceed = false;
                    break;
                }
            }
            worker.startGracefulShutdown().get();

            if (!allSucceed) {
                throw new RuntimeException("Caught exception in processRecords().");
            }
        } catch (InterruptedException | ExecutionException ex) {
            ex.printStackTrace();
            throw new RuntimeException("Failed to process record of DynamoDB Streams via KCL.");
        } finally {
            es.shutdownNow();
        }
        System.out.println("Completed stream processing");
    }
}

RecordProcessorFactory

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import software.amazon.awssdk.services.s3.S3Client;

public class RecordProcessorFactory implements IRecordProcessorFactory {

    private S3Client s3Client;
    private ShardReadWriteLock shardReadWriteLock;

    public RecordProcessorFactory(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
        this.s3Client = s3Client;
        this.shardReadWriteLock = shardReadWriteLock;
    }

    @Override
    public IRecordProcessor createProcessor() {
        return new RecordProcessor(s3Client, shardReadWriteLock);
    }
}

RecordProcessor

import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.util.Objects;
import java.util.Optional;

public class RecordProcessor implements IRecordProcessor, IShutdownNotificationAware {

    private final String bucket = "bucket";
    private final String objectKey = "objectKey";

    private S3Client s3Client;
    private ShardReadWriteLock shardReadWriteLock;
    private String shardId;

    public RecordProcessor(S3Client s3Client, ShardReadWriteLock shardReadWriteLock) {
        this.s3Client = s3Client;
        this.shardReadWriteLock = shardReadWriteLock;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.getShardId();
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        Optional<ShardReadWriteLock.ShardProcessStatus> status = shardReadWriteLock.read(shardId);
        if (!Objects.isNull(status) && status.equals(ShardReadWriteLock.ShardProcessStatus.FAILURE)) {
            throw new RuntimeException("Shard " + shardId + " have failed.");
        }

        shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.INITIALIZE);

        try {
            for (Record record : processRecordsInput.getRecords()) {
                if (record instanceof RecordAdapter) {
                    com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
                            .getInternalObject();

                    if (!streamRecord.getEventName().equals("INSERT")) {
                        continue;
                    }

                    s3Client.putObject(PutObjectRequest.builder().bucket(bucket).key(objectKey).build(),
                            RequestBody.fromString(String.valueOf(streamRecord)));
                }
            }

            shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.SUCCESS);
        } catch (Exception ex) {
            shardReadWriteLock.write(shardId, ShardReadWriteLock.ShardProcessStatus.FAILURE);

            processRecordsInput.getRecords().forEach(s -> System.err.println(((RecordAdapter) s).getInternalObject()));

            ex.printStackTrace();
            throw new RuntimeException("Caught exception in process on shard " + shardId);
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (InvalidStateException | ShutdownException ex) {
                ex.printStackTrace();
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (InvalidStateException | ShutdownException ex) {
            ex.printStackTrace();
        }
    }
}

ShardReadWriteLock

--Adopte le modèle ReadWriteLock, qui est un modèle de conception multi-thread. --shardsProcessStatus est une carte où Key est l'ID de partition et la valeur est ShardProcessStatus, et il est implémenté de sorte que Lock est appliqué lors de la lecture / écriture de ceci. --INITIALIZE: Accordé au début du traitement processRecords () --SUCCESS: accordé lorsque le traitement processRecords () se termine normalement --FAILURE: accordé lorsqu'une exception se produit dans le traitement de processRecords ()


import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ShardReadWriteLock {

    public enum ShardProcessStatus {

        INITIALIZE,
        SUCCESS,
        FAILURE;
    }

    private Map<String, ShardProcessStatus> shardsProcessStatus = new HashMap<>();

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    public Optional<ShardProcessStatus> read(String shardId) {
        readLock.lock();
        try {
            return Optional.ofNullable(shardsProcessStatus.get(shardId));
        } finally {
            readLock.unlock();
        }
    }

    /**
     *Vérifiez tous les fragments pour le statut FAILURE
     */
    public boolean existsFailure() {
        readLock.lock();
        try {
            boolean exists = false;
            for (Map.Entry<String, ShardProcessStatus> entry : shardsProcessStatus.entrySet()) {
                if (entry.getValue().equals(ShardProcessStatus.FAILURE)) {
                    exists = true;
                    break;
                }
            }
            return exists;
        } finally {
            readLock.unlock();
        }
    }

    public void write(String shardId, ShardProcessStatus status) {
        writeLock.lock();
        shardsProcessStatus.put(shardId, status);
        writeLock.unlock();
    }
}

Résumé

Il était très difficile de traiter par lots avec KCL. En surveillant l'état de traitement de chaque partition à partir d'un thread externe, il est possible de détecter une erreur et d'interrompre le traitement. Afin de savoir comment processRecords () est appelé et comment les exceptions sont gérées, nous avons résumé le traitement KCL dans un diagramme de séquence. Nous espérons qu'il sera utile à ceux qui exécutent des applications grand public en tant que démons et à ceux qui utilisent Kinesis Data Streams. Le problème est que lorsqu'une exception se produit dans le processus de processRecords (), l'enregistrement est sorti dans le journal et géré manuellement.

référence

Changement

2020/09/15

ReadLock n'a pas été fait, alors modifiez le code source

Modification du code source de «l'implémentation d'une application grand public basée sur les spécifications KCL». L'implémentation était telle que Read Lock de la classe ShardReadWriteLock n'était pas possible. Nous avons également apporté des modifications aux classes StreamsAdapter et RecordProcessor qui appellent la méthoderead ()de la même classe.

Recommended Posts

[AWS x Java] Traiter les flux de base de données Dynamo avec la bibliothèque cliente Kinesis (KCL)
[Java EE] Implémenter le client avec WebSocket
SDK AWS pour Java 1.11.x et 2.x
Utilisation de Java avec AWS Lambda-Eclipse Préparation