[Détails] Implémentation d'applications grand public avec Kinesis Client Library for Java

introduction

** Kinesis Client Library (KCL) ** est une bibliothèque conçue par AWS pour implémenter des applications grand public qui reçoivent et traitent des données (enregistrements) circulant dans * AWS Kinesis Data Stream *.

Pour traiter les enregistrements Kinesis

Il existe l'une ou l'autre des méthodes, mais ** KCL ** est utilisé dans le premier cas.

Cependant, il est difficile de comprendre quand le ** processeur d'enregistrement ** est appelé et ce qui se passe lorsque le ** processeur d'enregistrement ** renvoie une erreur. Si vous faites une erreur ici, Kinesis Les enregistrements peuvent rester dans le flux ou être supprimés par inadvertance.

Cet article complète les éléments non spécifiés dans le Guide officiel des flux de données AWS Kinesis. Je vais vous expliquer comment implémenter une application utilisant ** KCL **.

Notez que ** KCL ** a des bibliothèques pour * Java *, * JavaScript (Node.js) *, * Python *, * .NET * et * Ruby *, mais dans cet article, * Gère les bibliothèques pour Java * (interface 1.x v2. Les détails seront décrits plus loin).

Qu'est-ce que la bibliothèque cliente Kinesis?

Fonctions de KCL

Obtenez un enregistrement

Normalement, obtenir un enregistrement à partir d'un flux Kinesis nécessite les étapes consistant à obtenir un fragment du flux d'intérêt, à obtenir un itérateur pour parcourir les enregistrements dans le fragment et à parcourir l'itérateur. (Pour plus de détails, consultez Développement grand public à l'aide de l'API Kinesis Data Streams et du kit SDK AWS pour Java (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-sdk.html) ) Voir)

** KCL ** le fera automatiquement pour vous et obtiendra l'enregistrement pour vous.

Gestion de la position de traitement des enregistrements

Les flux Kinesis, comme leur nom l'indique, sont des flux, donc contrairement aux files d'attente, vous ne pouvez pas supprimer les enregistrements récupérés. Les enregistrements resteront dans le flux jusqu'à ce qu'un certain temps se soit écoulé.

Par conséquent, si vous ne vous souvenez pas à quel point l'application a traité les enregistrements dans le flux, tous les enregistrements du flux seront traités à nouveau depuis le début (TRIM_HORIZON) lorsque l'application est arrêtée / redémarrée, ou l'application sera arrêtée. Tous les enregistrements qui ont afflué seront supprimés («DERNIER»).

** KCL ** garde une trace des enregistrements qui ont été traités, et lorsque l'application est arrêtée / redémarrée, il reprend le traitement à partir des enregistrements qui n'ont pas encore été traités.

Suite à l'augmentation / diminution des fragments

Pour mettre à l'échelle le flux Kinesis, augmentez le nombre de fragments, mais si vous implémentez vous-même le processus d'acquisition d'enregistrements comme décrit ci-dessus, vous devrez implémenter un mécanisme pour détecter que les fragments ont augmenté (diminué). Je vais.

** KCL ** détecte l'augmentation ou la diminution des partitions dans le flux Kinesis et démarre automatiquement le traitement des partitions nouvellement ouvertes.

Traitement parallèle des fragments

Les flux Kinesis garantissent que les enregistrements avec la même clé de partition sont acheminés vers la même partition. (Définissez la clé de partition sur un identifiant unique pour l'enregistrement, par exemple "numéro de commande")

Cela signifie que les enregistrements associés ne sont pas dispersés sur plusieurs fragments, de sorte que chaque fragment peut être traité en parallèle de manière totalement indépendante. (Au contraire, cela n'a pas de sens d'augmenter les fragments autrement)

** KCL ** traite chaque partition détectée en parallèle dans un thread distinct.

Traitement distribué par multi-processus

Les applications ** KCL ** gèrent généralement toutes les partitions du flux Kinesis cible en un seul processus, mais à mesure que le nombre de partitions augmente, le degré de parallélisme augmente et un processus ne peut pas le gérer. Il y a une possibilité.

Dans ce cas, vous pouvez lancer plusieurs applications (processus) KCL pour le même flux Kinesis afin de répartir la charge entre les processus.

KCL pour la version Java

** KCL pour Java ** a actuellement les versions suivantes: Cet article traite de ** "Module version 1.x interface version v2" **.

Version du module Version de l'interface paquet Remarques
1.x [GitHub] v1 com.amazonaws.services.kinesis.clientlibrary.interfaces Aussi appelé "interface d'origine"
v2 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 Ceci est la version couverte dans cet article
2.x [GitHub] - software.amazon.kinesis PourprofiterdelarépartitionétendueUtilisezcetteversionNécessaire

La différence entre les versions d'interface * v1 * et * v2 * de la version du module * 1.x * est légère,

C'est deux points.

Les versions de module * 1.x * et * 2.x * ne diffèrent pas de manière significative dans l'utilisation de l'implémentation de ** record processor **, mais l'API a été repensée et n'est pas compatible.

Concept KCL

KCL.jpg

Processeur d'enregistrement

Un gestionnaire pour le traitement des enregistrements circulant dans le flux Kinesis. L'interface de base définit trois méthodes, «traitement d'initialisation», «traitement d'enregistrement» et «traitement de terminaison», et les développeurs d'applications doivent implémenter ces trois méthodes.

Le ** Processeur d'enregistrement ** est instancié dans une application KCL avec une correspondance biunivoque avec les fragments de flux Kinesis et est responsable du traitement des enregistrements circulant dans chaque fragment.

Point de contrôle

Indique dans quelle mesure les enregistrements du flux Kinesis (plus précisément, dans la partition) ont été traités. Les points de contrôle ne sont pas enregistrés automatiquement et doivent être correctement enregistrés par le développeur de l'application dans le ** Processeur d'enregistrement **. (Le composant d'enregistrement des points de contrôle s'appelle ** checkpointer **)

** Les points de contrôle ** sont enregistrés dans * DynamoDB *. Lorsque vous exécutez l'application KCL, ** KCL ** crée automatiquement une table dans * DynamoDB *.

ouvrier

La gestion du cycle de vie du processeur d'enregistrements (génération / arrêt) est effectuée en fonction du nombre de fragments dans le flux Kinesis. Le développeur de l'application doit générer et démarrer un ** worker ** avec les paramètres requis (tels que le flux Kinesis à traiter).

Il n'y a qu'un seul ** worker ** dans l'application KCL.

Implémentation de l'application KCL

Enregistrer l'implémentation du processeur

Tout d'abord, implémentez un ** processeur d'enregistrement ** pour traiter les enregistrements Kinesis.

Le processeur d'enregistrement est thread-safe. Chaque méthode du processeur d'enregistrement ne peut pas être appelée par plusieurs threads. Par conséquent, le processeur d'enregistrement peut avoir les informations nécessaires en tant que variable d'instance.

ExampleRecordProcessor


public class ExampleRecordProcessor implements IRecordProcessor {

    private final String tableName;

    ExampleRecordProcessor(String tableName) {
        this.tableName = tableName;
    }

    private String shardId;
    private AmazonDynamoDB dynamoDB;
    private Table table;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.getShardId();

        // Initialize any resources for #processRecords().
        dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
        table = new Table(dynamoDB, tableName);
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        // Processing incoming records.
        retry(() -> {
            processRecordsInput.getRecords().forEach(record -> {
                System.out.println(record);
            });
        });

        // Record checkpoint if all incoming records processed successfully.
        recordCheckpoint(processRecordsInput.getCheckpointer());
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        // Record checkpoint at closing shard if shutdown reason is TERMINATE.
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            recordCheckpoint(shutdownInput.getCheckpointer());
        }

        // Cleanup initialized resources.
        Optional.ofNullable(dynamoDB).ifPresent(AmazonDynamoDB::shutdown);
    }

    private void recordCheckpoint(IRecordProcessorCheckpointer checkpointer) {
        retry(() -> {
            try {
                checkpointer.checkpoint();
            } catch (Throwable e) {
                throw new RuntimeException("Record checkpoint failed.", e);
            }
        });
    }

    private void retry(Runnable f) {
        try {
            f.run();
        } catch (Throwable e) {
            System.out.println(String.format("An error occurred %s. That will be retry...", e.getMessage()));
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
            retry(f);
        }
    }

Mise en place d'une usine de traitement de disques

Ensuite, implémentez la ** Record Processor Factory ** qui sera utilisée pour générer le processeur d'enregistrement. ** Worker ** utilise cette usine pour générer un processeur d'enregistrement.

ExampleRecordProcessorFactory


public class ExampleRecordProcessorFactory implements IRecordProcessorFactory {

    private final String tableName;

    ExampleRecordProcessorFactory(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public IRecordProcessor createProcessor() {
        return new ExampleRecordProcessor(tableName);
    }
}

Création et lancement des travailleurs

Enfin, à partir du point d'entrée (classe * Main *) de l'application KCL, créez et lancez un ** worker **.

Utilisez com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder pour générer des * ** workers **.

La capacité de lecture / écriture de la table DynamoDB pour l'enregistrement des points de contrôle, qui est automatiquement créée par * ** KCL **, est par défaut «10». Si vous voulez le changer, vous pouvez le spécifier avec KinesisClientLibConfiguration.

App


public class App {

    public static void main(String... args) {

        // Create a Worker.
        final Worker worker = new Worker.Builder()
                .recordProcessorFactory(
                        new ExampleRecordProcessorFactory("examples-table")
                )
                .config(
                        new KinesisClientLibConfiguration(
                                "kcl-java-example",
                                "kcl-sample",
                                DefaultAWSCredentialsProviderChain.getInstance(),
                                generateWorkerId()
                        ).withRegionName("us-east-1")
                        .withInitialLeaseTableReadCapacity(1)
                        .withInitialLeaseTableWriteCapacity(1)
                )
                .build();

        // Start the worker.
        worker.run();
    }

    private static String generateWorkerId() {
        try {
            return InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        } catch (UnknownHostException e) {
            throw new RuntimeException("Could not generate worker ID.", e);
        }
    }

}

(Facultatif) Arrêt sécurisé des travailleurs

Appelez Worker # startGracefulShutdown () pour arrêter en toute sécurité le ** worker ** démarré (s'il y a un enregistrement en cours, il terminera le traitement et enregistrera un point de contrôle avant de s'arrêter).

Normalement, un hook d'arrêt de machine virtuelle Java ([Runtime # addShutdownHook ()](https://docs.oracle.com/javase/jp/9/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang. En appelant depuis Thread-)), vous pouvez arrêter en toute sécurité le ** worker ** à la fin du processus JVM dans votre application KCL.

App


final Worker worker = ...;

// Shutdown worker gracefully using shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    try {
        worker.startGracefulShutdown().get();
    } catch (Throwable e) {
        e.printStackTrace();
    }
}));

Arrêt sécurisé du processeur d'enregistrement

Si vous souhaitez arrêter en toute sécurité ** workers **, vous devez également implémenter un processeur d'enregistrement pour arrêter en toute sécurité.

ExampleRecordProcessor


public class ExampleRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {

    :

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        // Record checkpoint at graceful shutdown.
        recordCheckpoint(checkpointer);
    }

}

Enregistrer le cycle de vie du processeur

Enregistrer la transition de l'état du processeur


     +-------------------+
     | Waiting on Parent |                               +------------------+
+----+       Shard       |                               |     Shutdown     |
|    |                   |          +--------------------+   Notification   |
|    +----------+--------+          |  Shutdown:         |     Requested    |
|               | Success           |   Requested        +-+-------+--------+
|               |                   |                      |       |
|        +------+-------------+     |                      |       | Shutdown:
|        |    Initializing    +-----+                      |       |  Requested
|        |                    |     |                      |       |
|        |                    +-----+-------+              |       |
|        +---------+----------+     |       | Shutdown:    | +-----+-------------+
|                  | Success        |       |  Terminated  | |     Shutdown      |
|                  |                |       |  Zombie      | |   Notification    +-------------+
|           +------+-------------+  |       |              | |     Complete      |             |
|           |     Processing     +--+       |              | ++-----------+------+             |
|       +---+                    |          |              |  |           |                    |
|       |   |                    +----------+              |  |           | Shutdown:          |
|       |   +------+-------------+          |              \  /           |  Requested         |
|       |          |                        |               \/            +--------------------+
|       |          |                        |               ||
|       | Success  |                        |               || Shutdown:
|       +----------+                        |               ||  Terminated
|                                           |               ||  Zombie
|                                           |               ||
|                                           |               ||
|                                           |           +---++--------------+
|                                           |           |   Shutting Down   |
|                                           +-----------+                   |
|                                                       |                   |
|                                                       +--------+----------+
|                                                                |
|                                                                | Shutdown:
|                                                                |  All Reasons
|                                                                |
|                                                                |
|      Shutdown:                                        +--------+----------+
|        All Reasons                                    |     Shutdown      |
+-------------------------------------------------------+     Complete      |
                                                        |                   |
                                                        +-------------------+

Lorsque chaque méthode du processeur d'enregistrement est appelée

1.Quand le travailleur commence 2.Lors de la réception d'un enregistrement 3.Au fragment CLOSE (* 1) 4.Lorsque le fragment est ouvert (* 1) 5.Lorsqu'un travailleur est arrêté en toute sécurité
IRecordProcessor#initialize() ① Sécurisation des ressources, etc. - - ① Sécurisation des ressources, etc. -
IRecordProcessor#processRecords() - ① Traitement des enregistrements reçus
② Enregistrement du point de contrôle
- - -
IRecordProcessor#shutdown()(※2) - - reason=TERMINATEEnregistrement des points de contrôle - reason=ZOMBIE
IShutdownNotificationAware#shutdownRequested()(※2) - - - - ① Enregistrement du point de contrôle

La gestion des erreurs

Traitement des erreurs du processeur d'enregistrement

Le comportement lorsqu'une exception est levée à partir de chaque méthode implémentée par le processeur d'enregistrement est le suivant.

Méthode Comportement lors de la levée d'une exception
IRecordProcessor#initialize() Il continuera à être appelé à plusieurs reprises jusqu'à ce qu'il revienne normalement.
IRecordProcessor#processRecords() Le journal des erreurs a été généré et passé en argumentIgnorer les enregistrementsSera fait.
IRecordProcessor#shutdown() Il continuera à être appelé à plusieurs reprises jusqu'à ce qu'il revienne normalement.
IShutdownNotificationAware#shutdownRequested() Il continuera à être appelé à plusieurs reprises jusqu'à ce qu'il revienne normalement.

Si vous ne souhaitez pas ignorer s'il y a une erreur dans le traitement de l'enregistrement

Si une erreur se produit dans le traitement de l'enregistrement (# processRecords ()) du ** processeur d'enregistrement **, l'idée de base de ** KCL ** est de sauter l'enregistrement et de passer à l'enregistrement suivant. ..

Si vous ne souhaitez pas ignorer les enregistrements en raison des exigences de l'application (par exemple, devoir traiter les enregistrements dans l'ordre), ** KCL ** n'a pas de mécanisme, alors implémentez-le vous-même. est nécessaire.

Fonctionnement de l'application KCL

Enregistrement

La sortie du journal de ** KCL ** peut être contrôlée par les enregistreurs suivants.

Enregistreur La description
com.amazonaws.services.kinesis.clientlibrary NormalementINFOKCLSi vous souhaitez afficher le journal de débogage deDEBUG
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker INFOSi vous faites ce qui suitSleeping...Parce que c'est ennuyeux parce que le journal qui ditWARNIl est bon de le garder.

Gestion de la table des points de contrôle

Les références

Recommended Posts

[Détails] Implémentation d'applications grand public avec Kinesis Client Library for Java
[AWS x Java] Traiter les flux de base de données Dynamo avec la bibliothèque cliente Kinesis (KCL)
Implémentation de la méthode de clonage pour Java Record
Obtenir une liste d'informations MBean pour les applications Java
CI l'architecture des applications Java / Kotlin avec ArchUnit
Examen et mise en œuvre de la bibliothèque CSV pour le chargement de grandes quantités de données dans MySQL (Java)
[Java] Simplifiez la mise en œuvre de la gestion de l'historique des données avec Reladomo
Implémentation d'un analyseur de syntaxe mathématique par méthode d'analyse syntaxique descendante récursive (Java)
[Java] Exemple de projet de développement d'applications Web avec Spring Boot
[Java] Implémentation du réseau Faistel
Implémentation de XLPagerTabStrip avec TabBarController
Implémentation Java de tri-tree
Exemple d'utilisation de l'API Bulk de Salesforce à partir d'un client Java avec PK-chunking
[Code Pipeline x Elastic Beanstalk] Récapitulatif des erreurs et des contre-mesures pour les applications Java CI / CD vers Elastic Beanstalk avec Code Pipeline
[Pour les débutants] Résumé du constructeur java
[Java EE] Implémenter le client avec WebSocket
Transformez facilement les applications Java en Docker avec Jib
Activez OpenCV avec java8. (Pour moi-même)
Génériques Kotlin pour les développeurs Java
Implémentation d'une fonction similaire en Java
Obtenir une liste de fichiers S3 avec ListObjectsV2Request (AWS SDK for Java)
Pratique des bases de la programmation Java - Je veux afficher un triangle avec une instruction for ①
Pratique des bases de la programmation Java - Je veux afficher un triangle avec une instruction for ②