Flux de données Kinesis à partir de zéro expérience Java (1)

J'ai un besoin, je vais donc essayer de gérer Kinesis en Java.

--AWS est touché, mais les langages de script (Python, Javascript) sont les principaux

Un dossier d'étude d'un ingénieur.

Présentation d'Amazon Kinesis Data Streams

img Depuis la Documentation du développeur

En gros, un service de file d'attente pour de grandes quantités de données. Il existe d'autres services de file d'attente tels que SQS et Amazon MQ, et plus spécifiquement AWS IoT et Step Functions, mais Kinesis est un service qui se concentre particulièrement sur de grandes quantités de données.

--Augmentation / diminution de la capacité en raison de l'augmentation / diminution des fragments

Je pense que la zone est une grande caractéristique. plus loin

L'une des caractéristiques est que c'est fait. Bien sûr, comme d'autres services AWS, il s'agit d'un appel d'API, il peut donc être utilisé à partir de l'AWS CLI et du SDK AWS. D'autre part, en utilisant KPL et KCL [avantages en termes de performances et de coopération entre les deux](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-producers-with-kpl.html # developpement-producteurs-avec-avantage-kpl). Ces KPL et KCL sont écrits en Java. Par conséquent, j'ai décidé d'apprendre à l'utiliser en Java. (Pour être précis, KPL encapsule les modules écrits en C ++ en Java Ou KCL [Utiliser le wrapper Java avec Python, Node, etc.](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-kcl.html#kinesis -record-processor-overview-kcl) est également possible)

Tutoriel officiel

Essayons Tutoriel: Visualiser le trafic Web à l'aide des flux de données Amazon Kinesis pense. En gros, vous pouvez continuer en cliquant simplement, alors notez les points qui vous intéressent en lisant le document ci-dessus.

Environnement

Environnement simplement en exécutant ce modèle CFn Est terminé. Concernant les Paramètres ʻApplication Archive, la valeur par défaut est v1.1.1 à partir du 4 juillet 2018, mais comme la dernière version est 1.1.2, https://github.com/aws-samples/amazon-kinesis Réécrire dans -data-visualization-sample / releases / download / v1.1.2 / amazon-kinesis-data-visualization-sample-1.1.2-assembly.zip` (here Vous pouvez consulter la dernière version sur aws-samples / amazon-kinesis-data-visualization-sample / releases). Entrez les valeurs appropriées pour les restrictions de paires de clés et d'adresses SSH. Le statut passera à CREATE_COMPLETE dans environ 5 minutes, vérifiez donc la sortie CFn à partir de l'onglet SORTIES.

Vérification de l'application

Vérifiez le graphique

Si vous accédez à l'adresse affichée dans l'élément URL de l'onglet Sorties (l'application de visualisation est hébergée sur EC2), vous verrez le graphique. Il semble s'agir d'une simulation d'une application dans laquelle un serveur Web publie séquentiellement le référent de l'utilisateur accédant à Kinesis, interroge le résultat, le visualise et l'agrège en temps réel.

Paramètres Kinesis

Je pense qu'un DataStream nommé KinesisDataVisSampleApp a été créé, vérifions donc chaque valeur de paramètre. Jetez un œil à l'onglet Détails.

Fragments

Shard est le nombre de divisions du disque à écrire

Chaque partition peut prendre en charge jusqu'à 5 transactions par seconde et un taux total maximum de lectures de données de 2 Mo par seconde. La partition prend également en charge jusqu'à 1 000 enregistrements par seconde pour les écritures et un taux d'écriture total maximal pour les données de 1 Mo par seconde (y compris les clés de partition). La capacité totale du flux est la capacité totale du fragment.

Comme indiqué dans le document officiel, les performances par partition sont fixes et les performances sont ajustées en augmentant ou en diminuant cette valeur. Fondamentalement, plus la valeur est élevée, meilleures sont les performances, mais la clé de partition (décrite plus loin) doit être suffisamment supérieure au nombre de fragments. Je pense que l'application créée cette fois est définie sur 2.

Server-side encryption Lorsqu'elle est activée, les données peuvent être chiffrées à l'aide de KMS.

Data retention period Les données peuvent être conservées dans Kinesis et peuvent être définies dans une plage de 24 à 168 heures. (Bien sûr, plus il est long, plus il sera facturé)

Shard level metrics Vous pouvez voir les métriques au niveau de la partition, ce qui vous aide à voir si les données sont efficacement réparties entre les partitions. Cela entraîne également des frais supplémentaires.

Surveillance depuis la console

Si vous regardez l'onglet Surveillance, vous verrez un graphique de chaque métrique. Il vous suffit de noter que les valeurs réelles sont affichées en bleu et que les éléments affichés en rouge indiquent les limites à la valeur actuelle du fragment (seule la ligne rouge est affichée). Ça marche, mais ça ne marche pas vraiment)

Producteur de données

Le côté qui écrit à Kinesis s'appelle le producteur. Cette application simule le fait de pousser le référent de la personne qui y a accédé, mais l'adresse est sélectionnée au hasard parmi 6 URL et renvoyée à Kinesis.

Ici Il y a un code côté producteur, donc je vais le vérifier.

(Je ne comprends pas du tout Java **, alors je vais le commenter **)

HttpReferrerKinesisPutter.java


// package:Nommez les diviseurs d'espace, utilisez apparemment des domaines détenus pour éviter les conflits avec quiconque dans le monde...
package com.amazonaws.services.kinesis.samples.datavis.producer;
// import:Semblable à Python, la dernière (IOException sur cette ligne) est globalement introduite dans l'espace de noms de ce fichier
import java.io.IOException;  // java.La bibliothèque standard commence par
import java.nio.ByteBuffer;  //Tampon (nio: non-Ça semble bloquer io, je vois)
import java.util.concurrent.TimeUnit;  //Bibliothèque d'opérations de commodité du temps

import org.apache.commons.logging.Log;  // commons-Une bibliothèque qui fournit une interface de journalisation unifiée, qui semble journaliser
import org.apache.commons.logging.LogFactory;  //Usine utilisée comme ensemble avec ↑

// AWS SDK for Java
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;  
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.samples.datavis.model.HttpReferrerPair;  //Générateur de références (l'espace de noms est compliqué, mais les exemples et ci-dessous sont dans le code de ce tutoriel)

import com.fasterxml.jackson.databind.ObjectMapper;  // Jackson:Bibliothèque d'analyseur JSON pour Java

/**
 *Envoyer une paire de références HTTP à Kinesis
 */
public class HttpReferrerKinesisPutter {
    private static final Log LOG = LogFactory.getLog(HttpReferrerKinesisPutter.class);  //Journal, semble manger sa propre classe

    private HttpReferrerPairFactory referrerFactory;  //Usine pour la génération de références
    private AmazonKinesis kinesis;  // AWS SDK
    private String streamName;

    private final ObjectMapper JSON = new ObjectMapper();  //Convertir un objet Java ↔︎ JSON via ce type

    //Reçoit la fabrique de génération de références, AWS SDK et le nom du flux Kineisi dans lequel écrire et écrit dans Kinesis.
    //En Java, il devient un constructeur avec une déclaration de méthode qui a le même nom que le nom de la classe et aucune valeur de retour.
    public HttpReferrerKinesisPutter(HttpReferrerPairFactory pairFactory, AmazonKinesis kinesis, String streamName) {
        //Gestion des erreurs d'argument
        if (pairFactory == null) {
            throw new IllegalArgumentException("pairFactory must not be null");
        }
        if (kinesis == null) {
            throw new IllegalArgumentException("kinesis must not be null");
        }
        if (streamName == null || streamName.isEmpty()) {
            throw new IllegalArgumentException("streamName must not be null or empty");
        }
        this.referrerFactory = pairFactory;  //Classe Hmmm
        this.kinesis = kinesis;
        this.streamName = streamName;
    }

    //Quelque chose comme ↓ s'appelle Javadoc@Les documents peuvent être générés en annotant avec (docstring et Sphinx-like)
    /**
     *Envoyez un nombre fixe de paires de références HTTP à Kinesis. Ceux-ci sont envoyés séquentiellement
     *Si vous voulez un débit, plusieurs{@link HttpReferrerKinesisPutter}Les usages
     *
     * @param n Nombre de paires envoyées à Kinesis
     * @param delayB BetweenRecords Temps d'attente entre les transmissions d'enregistrement, ignoré s'il est inférieur à 0
     * @param unitForDelay L'unité de temps utilisée comme temps d'attente
     *
     * @jette une exception InterruptedException en cas d'interruption avant l'envoi de la paire suivante
     */
    public void sendPairs(long n, long delayBetweenRecords, TimeUnit unitForDelay) throws InterruptedException {
        for (int i = 0; i < n && !Thread.currentThread().isInterrupted(); i++) {  //Obtenir le fil de discussion actuel avec currentThread
            sendPair();  //Envoyer
            Thread.sleep(unitForDelay.toMillis(delayBetweenRecords));  //Attendre
        }
    }

    /**
     *Envoyez des paires de référents HTTP à Kinesis à l'infini, arrêtez uniquement en cas d'interruption
     *Multiple lorsque vous voulez du débit{@link HttpReferrerKinesisPutter}Pensez à utiliser s
     *
     * @param delayB BetweenRecords Temps d'attente entre les transmissions d'enregistrement, ignoré s'il est inférieur à 0
     * @param unitForDelay L'unité de temps utilisée comme temps d'attente
     *
     * @jette une exception InterruptedException en cas d'interruption avant l'envoi de la paire suivante
     */
    public void sendPairsIndefinitely(long delayBetweenRecords, TimeUnit unitForDelay) throws InterruptedException {
        while (!Thread.currentThread().isInterrupted()) {
            sendPair();
            if (delayBetweenRecords > 0) {
                Thread.sleep(unitForDelay.toMillis(delayBetweenRecords));
            }
        }
    }

    /**
     *Envoyer une seule paire à Kinesis à l'aide de PutRecord
     */
    private void sendPair() {
        HttpReferrerPair pair = referrerFactory.create();  //Générer une paire de références
        byte[] bytes;  //Ceci déclare "octets de liste de type octet"
        try {
            bytes = JSON.writeValueAsBytes(pair);  //UTF la paire-8 tableau d'octets codés et JSONized (sérialisé)
        } catch (IOException e) {
            LOG.warn("Skipping pair. Unable to serialize: '" + pair + "'", e);
            return;
        }

        PutRecordRequest putRecord = new PutRecordRequest();
        putRecord.setStreamName(streamName);
        //En utilisant des ressources comme clés de partition, le total des ressources données peut être calculé avec précision.
        putRecord.setPartitionKey(pair.getResource());
        putRecord.setData(ByteBuffer.wrap(bytes));  //Probablement, quels que soient les octets, réservez de la mémoire pour le moment et définissez?Enquête requise
        //N'envoyez pas SequenceNumberForOrdering car la commande n'a pas d'importance pour cette application
        putRecord.setSequenceNumberForOrdering(null);

        try {
            kinesis.putRecord(putRecord);
        } catch (ProvisionedThroughputExceededException ex) {  //Lorsque le débit est dépassé
            //Sortie si le journal est activé
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Thread %s's Throughput exceeded. Waiting 10ms", Thread.currentThread().getName()));
            }
            //Attendez 10 secondes
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } catch (AmazonClientException ex) {
            LOG.warn("Error sending record to Amazon Kinesis.", ex);
        }
    }
}

Il s'agit de [HttpReferrerStreamWriter.java](https://github.com/aws-samples/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/samples/ J'écris en appelant de la classe principale de datavis / HttpReferrerStreamWriter.java). C'est écrit correctement, donc c'est long, mais tout ce que vous avez à faire est d'écrire les données sur Kinesis et de l'exécuter sans fin. Bien que je sois producteur, j'écris en utilisant le kit SDK AWS sans utiliser KPL.

Consommateur de données

Au contraire, le côté qui acquiert des données est appelé un consommateur de données. Dans cette application, il est persistant en acquérant et en agrégeant à partir du flux de données pendant un nombre fixe de secondes et en l'écrivant dans Dynamo (il semble que l'application Web le visualisera après cela).

Cela utilise KCL au lieu d'appeler directement l'API. C'est un peu long, mais [CountingRecordProcessor.java](https://github.com/aws-samples/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/ Jetons un coup d'œil à samples / datavis / kcl / CountingRecordProcessor.java).

CountingRecordProcessor.java


package com.amazonaws.services.kinesis.samples.datavis.kcl;

import java.io.IOException;
import java.util.List;
import java.util.Map;
// concurrent:Bibliothèque pour le multithreading
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// kcl
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.samples.datavis.kcl.counter.SlidingWindowCounter;
import com.amazonaws.services.kinesis.samples.datavis.kcl.persistence.CountPersister;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.Clock;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.NanoClock;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.Timer;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * (HttpReferrerPair -> count(pair))Calcul de la cartographie dans une largeur de temps fixe. Les comptages sont calculés pendant un intervalle donné
 * 
 * @param <T>Types d'enregistrements que ce processeur peut compter: <>Génériques express avec(Utilisé lors de la création d'objets prenant différents types comme arguments)
 */
public class CountingRecordProcessor<T> implements IRecordProcessor {  //Vous devez implémenter l'interface dans les implements et l'IRecordProcessor dans KCL
    private static final Log LOG = LogFactory.getLog(CountingRecordProcessor.class);

    //Verrouiller pour utiliser la minuterie
    private static final Clock NANO_CLOCK = new NanoClock();
    //Minuterie pour la planification des points de contrôle
    private Timer checkpointTimer = new Timer(NANO_CLOCK);

    //Mappeur d'objets JSON pour désérialiser les enregistrements
    private final ObjectMapper JSON;

    //Intervalle jusqu'à ce que le compte distinct soit calculé
    private int computeIntervalInMillis;
    //Temps total prévu lors du calcul du total
    private int computeRangeInMillis;

    //Compteur pour contenir le compte par intervalle
    private SlidingWindowCounter<T> counter;

    //Shard que ce processeur effectue le calcul
    private String kinesisShardId;

    //Planifier les mises à jour du nombre à un taux fixe (computeIntervalInMillis) pour différents threads
    private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);

    //Implémentation de la persistance du nombre pour chaque intervalle
    private CountPersister<T> persister;

    private CountingRecordProcessorConfig config;

    //Le type d'enregistrement que vous recevrez en JSON
    private Class<T> recordType;

    /**
     *Générer un nouveau processeur
     *
     * @param config config de ce processeur d'enregistrement
     * @param recordType UTF-8 Type d'enregistrement qui sera reçu sous forme de chaîne JSON
     * @param persister Nombre qui sera conservé dans ce processeur d'enregistrement
     * @param computeRangeInMillis La plage pour calculer le nombre distinct
     * @param computeIntervalInMillis Intervalle de calcul de tous les comptes pour tous les temps
     */
    public CountingRecordProcessor(CountingRecordProcessorConfig config,
            Class<T> recordType,
            CountPersister<T> persister,
            int computeRangeInMillis,
            int computeIntervalInMillis) {
        if (config == null) {
            throw new NullPointerException("config must not be null");
        }
        if (recordType == null) {
            throw new NullPointerException("recordType must not be null");
        }
        if (persister == null) {
            throw new NullPointerException("persister must not be null");
        }
        if (computeRangeInMillis <= 0) {
            throw new IllegalArgumentException("computeRangeInMillis must be > 0");
        }
        if (computeIntervalInMillis <= 0) {
            throw new IllegalArgumentException("computeIntervalInMillis must be > 0");
        }
        if (computeRangeInMillis % computeIntervalInMillis != 0) {
            throw new IllegalArgumentException("compute range must be evenly divisible by compute interval to support "
                    + "accurate intervals");
        }

        this.config = config;
        this.recordType = recordType;
        this.persister = persister;
        this.computeRangeInMillis = computeRangeInMillis;
        this.computeIntervalInMillis = computeIntervalInMillis;

        //Créer un mappeur d'objets pour les enregistrements désérialisés, en ignorant les propriétés inconnues
        JSON = new ObjectMapper();
        JSON.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @Override  //Vérifie les annotations, les fautes d'orthographe, etc.
    public void initialize(String shardId) {
        kinesisShardId = shardId;
        resetCheckpointAlarm();

        persister.initialize();

        //Créez une fenêtre glissante suffisamment grande pour contenir toute la plage de comptages pour chaque intervalle
        counter = new SlidingWindowCounter<>((int) (computeRangeInMillis / computeIntervalInMillis));

        //Générez des tâches planifiées qui effectuent des calculs et conservent les décomptes pour chaque computeIntervalInMillis
        scheduledExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //Synchronisez les compteurs, alors arrêtez d'avancer les intervalles lors de la création de points de contrôle
                synchronized (counter) {
                    try {
                        advanceOneInterval();
                    } catch (Exception ex) {
                        LOG.warn("Error advancing sliding window one interval (" + computeIntervalInMillis
                                + "ms). Skipping this interval.", ex);
                    }
                }
            }
        },
                TimeUnit.SECONDS.toMillis(config.getInitialWindowAdvanceDelayInSeconds()),
                computeIntervalInMillis,
                TimeUnit.MILLISECONDS);
    }

    /**
     *Avance le compteur de la fenêtre glissante interne de 1 à intervalles, active la persistance du comptage si la fenêtre est pleine
     */
    protected void advanceOneInterval() {
        Map<T, Long> counts = null;
        synchronized (counter) {
            //Conserver le décompte uniquement lors de la conservation des données pour toute la plage. Pas besoin de compter chaque pièce au début du processus.
            if (shouldPersistCounts()) {
                counts = counter.getCounts();
                counter.pruneEmptyObjects();
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("We have not collected enough interval samples to calculate across the "
                            + "entire range from shard %s. Skipping this interval.", kinesisShardId));
                }
            }
            //Avancez la fenêtre "une aiguille"
            counter.advanceWindow();
        }
        //Persistance des données si vous conservez toute la plage
        if (counts != null) {
            persister.persist(counts);
        }
    }

    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        for (Record r : records) {
            // Deserialize each record as an UTF-8 encoded JSON String of the type provided
            //UTF chaque enregistrement pour le type passé-Désérialiser en 8 chaînes JSON
            T pair;
            try {
                pair = JSON.readValue(r.getData().array(), recordType);
            } catch (IOException e) {
                LOG.warn("Skipping record. Unable to parse record into HttpReferrerPair. Partition Key: "
                        + r.getPartitionKey() + ". Sequence Number: " + r.getSequenceNumber(),
                        e);
                continue;
            }
            //Incrémentez le compteur pour une nouvelle paire. Synchrone car il existe d'autres threads qui lisent à partir du compteur et calculent le total à chaque intervalle.
            synchronized (counter) {
                counter.increment(pair);
            }
        }

        //Checkpoint à ce moment
        if (checkpointTimer.isTimeUp()) {
            //Verrouiller pour éviter que des comptages supplémentaires soient effectués pendant les points de contrôle
            synchronized (counter) {
                checkpoint(checkpointer);
                resetCheckpointAlarm();
            }
        }
    }

    /**
     *Avant de conserver un décompte, vous devez collecter suffisamment de données d'exemple dans toutes les plages de fenêtres
     *
     * @return {@code true}Collectez-vous tous les décomptes pour toutes les plages et collectez-vous suffisamment de données pour la persistance?
     */
    private boolean shouldPersistCounts() {
        return counter.isWindowFull();
    }

    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info("Shutting down record processor for shard: " + kinesisShardId);

        scheduledExecutor.shutdown();
        try {
            //Attendez 30 secondes pour que la tâche de service exécuteur se termine
            if (!scheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                LOG.warn("Failed to properly shut down interval thread pool for calculating interval counts and persisting them. Some counts may not have been persisted.");
            } else {
                //Point de contrôle uniquement lorsque le pool de threads est arrêté avec succès
                //Il est important de vérifier le point après avoir atteint la fin de la partition, ce qui permet à la partition enfant de commencer à traiter les données
                if (reason == ShutdownReason.TERMINATE) {
                    synchronized (counter) {
                        checkpoint(checkpointer);
                    }
                }
            }
        } catch (InterruptedException ie) {
            //Ne pas vérifier si l'arrêt propre échoue
            scheduledExecutor.shutdownNow();
            //Traitez cette erreur de la même manière que l'hôte ou le processus se bloque ou JVM Abort
            LOG.fatal("Couldn't successfully persist data within the max wait time. Aborting the JVM to mimic a crash.");
            System.exit(1);
        }
    }

    /**
     *Définir une minuterie pour le prochain point de contrôle
     */
    private void resetCheckpointAlarm() {
        checkpointTimer.alarmIn(config.getCheckpointIntervalInSeconds(), TimeUnit.SECONDS);
    }

    /**
     *Point de contrôle avec nouvelle tentative
     *
     * @param checkpointer
     */
    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
        LOG.info("Checkpointing shard " + kinesisShardId);
        for (int i = 0; i < config.getCheckpointRetries(); i++) {
            try {
                //Vérifiez d'abord la persistance pour vous assurer que tous les décomptes calculés ont été conservés
                persister.checkpoint();
                checkpointer.checkpoint();
                return;
            } catch (ShutdownException se) {
                //Ignorer les points de contrôle si l'instance de processeur s'arrête (échec)
                LOG.info("Caught shutdown exception, skipping checkpoint.", se);
                return;
            } catch (ThrottlingException e) {
                //Panne transitoire)Dans le cas, revenez en arrière et réessayez
                if (i >= (config.getCheckpointRetries() - 1)) {
                    LOG.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
                    break;
                } else {
                    LOG.info("Transient issue when checkpointing - attempt " + (i + 1) + " of "
                            + config.getCheckpointRetries(),
                            e);
                }
            } catch (InvalidStateException e) {
                //Indique un problème avec DynamoDB (vérifie les tables et les IOPS)
                LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
                break;
            } catch (InterruptedException e) {
                LOG.error("Error encountered while checkpointing count persister.", e);
                //Échec lors de la nouvelle tentative
            }
            try {
                Thread.sleep(config.getCheckpointBackoffTimeInSeconds());
            } catch (InterruptedException e) {
                LOG.debug("Interrupted sleep", e);
            }
        }
        //Traitez cette erreur de la même manière que l'hôte ou le processus se bloque ou JVM Abort
        LOG.fatal("Couldn't successfully persist data within max retry limit. Aborting the JVM to mimic a crash.");
        System.exit(1);
    }
}

Avancé (avancé) C'est un peu délicat pour les débutants Java, mais [Implémenter la méthode IRecordProcessor](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis- record-processor-implementation-app-java.html # kinesis-record-processor-implementation-interface-java) est fait dans ce fichier (veuillez pardonner le commentaire La traduction japonaise est difficile).

--ʻInitialize () : initialiser, identifier la partition à traiter --processRecords () : enregistrements de processus --shutdown () `: Fin du traitement

Implémente les trois méthodes de pour spécifier une série de processus, et processRecord reçoit la méthode checkpoint () pour suivre les enregistrements qui ont déjà été traités. Celles-ci contrôlent la partie de la partition avec laquelle vous travaillez.

En fait, en faisant glisser la fenêtre pendant 2 secondes, il semble que cela compte et sauve les 3 meilleurs téléspectateurs (je ne comprends pas encore complètement cette partie, travaux futurs.)

DynamoDB KCL crée une table dans DynamoDB pour conserver les informations sur l'état de l'application (point de contrôle 'et correspondance shard-worker). Lorsque vous exécutez réellement l'application, vous pouvez voir que la table DynamoDB a été créée. L'un est pour stocker les résultats du comptage et l'autre pour cette table de gestion d'état (KinesisDataVisSampleApp-KCLDynamoDBTable- [randomString]). Si vous regardez à l'intérieur, vous trouverez diverses clés telles que bailKey et checkpoint`. Ceux-ci sont utilisés pour gérer l'état de la phrase en cours de lecture. Il y a Description de chaque clé dans ce document.

Je vais sauter la compréhension du contenu pour le moment, mais il faudra changer le fragment en fonction de la vitesse de lecture.


J'ai étudié jusqu'à présent, mais [Tutoriel: Premiers pas avec Amazon Kinesis Data Streams à l'aide de l'AWS CLI](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-tutorial- cli.html) ressemble au prochain tutoriel, je vais donc essayer de le toucher via CLI.

Recommended Posts

Flux de données Kinesis à partir de zéro expérience Java (1)
Flux de données Kinesis à partir de zéro expérience Java (3.1)
Flux de données Kinesis à partir de zéro expérience Java (3.2)
Traitement des données à l'aide de l'API de flux de Java 8
Utiliser le type de données PostgreSQL (jsonb) à partir de Java
À propos des données locales CLDR activées par défaut à partir de Java 9
Appeler Java depuis JRuby
Changements de Java 8 à Java 11
Somme de Java_1 à 100
Expérience de passage Java Silver
Évaluer la source Java à partir de Java
[Java] Type de données ①-Type de base
Accédez à API.AI depuis Java
De Java à Ruby !!
[Java] Principaux types de données
Types de données de base Java
Java qui ignore les données d'Android vers le ROS de Jetson Nano
Obtenez les prévisions météorologiques de Watson Weather Company Data avec Java simple
Logiciel pratique CData (obtenir des données kintone à partir de l'application console Java)
Logiciel pratique CData (obtenir des données Twitter à partir de l'application console Java)