Kinesis-Datenströme ohne Java-Erfahrung (1)

Ich habe ein Bedürfnis, also werde ich versuchen, Kinesis in Java zu handhaben.

--AWS wird berührt, aber Skriptsprachen (Python, Javascript) sind die Hauptsprachen

Eine Studienaufzeichnung eines Ingenieurs.

Übersicht über Amazon Kinesis-Datenströme

img Aus Entwicklerdokumentation

Grob gesagt ein Warteschlangendienst für große Datenmengen. Es gibt andere Warteschlangendienste wie SQS und Amazon MQ, insbesondere AWS IoT und Step Functions. Kinesis ist jedoch ein Dienst, der sich insbesondere auf große Datenmengen konzentriert.

Ich denke, das ist ein großes Feature. des Weiteren

Eine der Funktionen ist, dass es fertig ist. Wie bei anderen AWS-Diensten handelt es sich natürlich um einen API-Aufruf, sodass er über die AWS CLI und das AWS SDK verwendet werden kann. Auf der anderen Seite die Verwendung von KPL und KCL Vorteile in Bezug auf Leistung und Zusammenarbeit zwischen den beiden # Entwickler-mit-kpl-Vorteil entwickeln). Diese KPL und KCL sind in Java geschrieben. Deshalb habe ich mich entschlossen zu lernen, wie man es in Java benutzt. (Um genau zu sein, KPL umschließt in C ++ in Java geschriebene Module ) Oder KCL [Java-Wrapper mit Python, Node usw. verwenden](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-kcl.html#kinesis -record-Prozessor-Übersicht-kcl) ist auch möglich)

Offizielles Tutorial

Versuchen wir es mit Tutorial: Visualisieren des Webverkehrs mithilfe von Amazon Kinesis-Datenströmen. Überlegen. Grundsätzlich können Sie fortfahren, indem Sie einfach auf klicken. Notieren Sie sich also die Punkte, an denen Sie interessiert sind, während Sie das obige Dokument lesen.

Umgebung

Umgebung nur durch Ausführen von dieser CFn-Vorlage Abgeschlossen. In Bezug auf "Anwendungsarchiv" von "Parametern" ist die Standardeinstellung v1.1.1 ab dem 4. Juli 2018, aber da die neueste Version 1.1.2 ist, lautet "https://github.com/aws-samples/amazon-kinesis" Schreiben Sie zu -data-visualization-sample / release / download / v1.1.2 / amazon-kinesis-data-visualization-sample-1.1.2 -embly.zip` (hier Sie können die neueste Version unter aws-samples / amazon-kinesis-data-visualization-sample / release überprüfen. Geben Sie die entsprechenden Werte für die Einschränkungen für Schlüsselpaare und SSH-Adressen ein. Der Status ändert sich in ca. 5 Minuten in CREATE_COMPLETE. Überprüfen Sie daher die CFn-Ausgabe auf der Registerkarte OUTPUTS.

Überprüfen der Anwendung

Überprüfen Sie die Grafik

Wenn Sie auf die Adresse zugreifen, die im URL-Element der Registerkarte "Ausgaben" angezeigt wird (die Visualisierungsanwendung wird auf EC2 gehostet), wird das Diagramm angezeigt. Es scheint sich um eine Simulation einer Anwendung zu handeln, bei der ein Webserver den Referer des zugreifenden Benutzers nacheinander an Kinesis veröffentlicht, das Ergebnis abfragt und es in Echtzeit visualisiert und aggregiert.

Kinesis-Einstellungen

Ich denke, dass ein DataStream mit dem Namen KinesisDataVisSampleApp erstellt wurde. Überprüfen wir also jeden Einstellungswert. Schauen Sie sich die Registerkarte Details an.

Scherben

Shard ist die Anzahl der Unterteilungen des zu schreibenden Datensatzes

Jeder Shard kann bis zu 5 Transaktionen pro Sekunde und eine maximale Gesamtleserate von 2 MB pro Sekunde unterstützen. Der Shard unterstützt außerdem bis zu 1.000 Datensätze pro Sekunde für Schreibvorgänge und eine maximale Gesamtschreibgeschwindigkeit von 1 MB pro Sekunde (einschließlich Partitionsschlüssel). Die Gesamtkapazität des Streams ist die Gesamtkapazität des Shards.

Wie im offiziellen Dokument angegeben, ist die Leistung pro Shard festgelegt und die Leistung wird durch Erhöhen oder Verringern dieser Leistung angepasst. Grundsätzlich gilt: Je größer der Wert, desto besser die Leistung. Der Partitionsschlüssel (später beschrieben) muss jedoch ausreichend größer sein als die Anzahl der Shards. Ich denke, dass die diesmal erstellte Anwendung auf 2 gesetzt ist.

Server-side encryption Im eingeschalteten Zustand können Daten mit KMS verschlüsselt werden.

Data retention period Die Daten können in Kinesis gespeichert und im Bereich von 24 bis 168 Stunden eingestellt werden. (Je länger es dauert, desto mehr wird es natürlich berechnet.)

Shard level metrics Sie können die Metriken auf Shard-Ebene anzeigen, um festzustellen, ob die Daten effizient auf Shards verteilt sind. Hierfür fällt ebenfalls eine zusätzliche Gebühr an.

Überwachung von der Konsole aus

Wenn Sie sich die Registerkarte Überwachung ansehen, sehen Sie ein Diagramm jeder Metrik. Sie müssen nur beachten, dass die tatsächlichen Werte blau angezeigt werden und die rot angezeigten Elemente die Grenzwerte für den aktuellen Shard-Wert angeben (nur die rote Linie wird angezeigt). Es funktioniert, aber es funktioniert nicht wirklich.

Datenproduzent

Die Seite, die an Kinesis schreibt, heißt Produzent. Diese Anwendung simuliert das Drücken des Referers der Person, die darauf zugegriffen hat, aber die Adresse wird zufällig aus 6 URLs ausgewählt und auf Kinesis geworfen.

Hier Es gibt einen Code auf der Herstellerseite, also werde ich ihn überprüfen.

(Ich verstehe Java überhaupt nicht **, also werde ich es kommentieren **)

HttpReferrerKinesisPutter.java


// package:Nennen Sie Raumteiler, verwenden Sie anscheinend eigene Domänen, um Konflikte mit irgendjemandem auf der Welt zu verhindern...
package com.amazonaws.services.kinesis.samples.datavis.producer;
// import:Ähnlich wie bei Python wird die letzte (IOException in dieser Zeile) global im Namespace dieser Datei eingeführt
import java.io.IOException;  // java.Standardbibliothek beginnt mit
import java.nio.ByteBuffer;  //Puffer (nio: non-Es scheint zu blockieren, ich verstehe)
import java.util.concurrent.TimeUnit;  //Zeitkomfort-Betriebsbibliothek

import org.apache.commons.logging.Log;  // commons-Eine Bibliothek, die eine einheitliche Protokollierungsschnittstelle bereitstellt, die anscheinend protokolliert wird
import org.apache.commons.logging.LogFactory;  //Werkseitig als Set mit ↑ verwendet

// AWS SDK for Java
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;  
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.samples.datavis.model.HttpReferrerPair;  //Referrer-Generator (Namespace ist kompliziert, aber Beispiele und darunter sind im Code dieses Tutorials enthalten)

import com.fasterxml.jackson.databind.ObjectMapper;  // Jackson:JSON-Parser-Bibliothek für Java

/**
 *Senden Sie ein Paar HTTP-Verweise an Kinesis
 */
public class HttpReferrerKinesisPutter {
    private static final Log LOG = LogFactory.getLog(HttpReferrerKinesisPutter.class);  //Log, scheint seine eigene Klasse zu essen

    private HttpReferrerPairFactory referrerFactory;  //Fabrik zur Generierung von Empfehlungen
    private AmazonKinesis kinesis;  // AWS SDK
    private String streamName;

    private final ObjectMapper JSON = new ObjectMapper();  //Konvertieren Sie das Java-Objekt ↔︎ JSON durch diesen Typen

    //Schreiben Sie an Kinesis, nachdem Sie die Empfehlungsgenerierungsfactory AWS SDK und den Namen des Ziel-Kineisi-Streams erhalten haben.
    //In Java wird es zu einem Konstruktor mit einer Methodendeklaration, die denselben Namen wie der Klassenname und keinen Rückgabewert hat.
    public HttpReferrerKinesisPutter(HttpReferrerPairFactory pairFactory, AmazonKinesis kinesis, String streamName) {
        //Behandlung von Argumentfehlern
        if (pairFactory == null) {
            throw new IllegalArgumentException("pairFactory must not be null");
        }
        if (kinesis == null) {
            throw new IllegalArgumentException("kinesis must not be null");
        }
        if (streamName == null || streamName.isEmpty()) {
            throw new IllegalArgumentException("streamName must not be null or empty");
        }
        this.referrerFactory = pairFactory;  //Hmmm Klasse
        this.kinesis = kinesis;
        this.streamName = streamName;
    }

    //So etwas wie ↓ heißt Javadoc@Dokumente können durch Annotieren mit (docstring und Sphinx-like) generiert werden.
    /**
     *Senden Sie eine feste Anzahl von HTTP-Empfehlungspaaren an Kinesis. Diese werden nacheinander gesendet
     *Wenn Sie einen Durchsatz wünschen, mehrere{@link HttpReferrerKinesisPutter}Verwendet
     *
     * @param n Anzahl der an Kinesis gesendeten Paare
     * @param delayBetweenRecords Wartezeit zwischen Datensatzübertragungen, ignoriert, wenn weniger als 0
     * @param unitForDelay Die Zeiteinheit, die als Wartezeit verwendet wird
     *
     * @löst InterruptedException Exception aus, wenn es unterbrochen wird, bevor das nächste Paar gesendet wird
     */
    public void sendPairs(long n, long delayBetweenRecords, TimeUnit unitForDelay) throws InterruptedException {
        for (int i = 0; i < n && !Thread.currentThread().isInterrupted(); i++) {  //Holen Sie sich den aktuellen Thread mit currentThread
            sendPair();  //Senden
            Thread.sleep(unitForDelay.toMillis(delayBetweenRecords));  //Warten
        }
    }

    /**
     *Senden Sie HTTP-Referrer-Paare endlos an Kinesis. Halten Sie nur an, wenn Sie unterbrochen werden
     *Mehrfach, wenn Sie einen Durchsatz wünschen{@link HttpReferrerKinesisPutter}Erwägen Sie die Verwendung von s
     *
     * @param delayBetweenRecords Wartezeit zwischen Datensatzübertragungen, ignoriert, wenn weniger als 0
     * @param unitForDelay Die Zeiteinheit, die als Wartezeit verwendet wird
     *
     * @löst InterruptedException Exception aus, wenn es unterbrochen wird, bevor das nächste Paar gesendet wird
     */
    public void sendPairsIndefinitely(long delayBetweenRecords, TimeUnit unitForDelay) throws InterruptedException {
        while (!Thread.currentThread().isInterrupted()) {
            sendPair();
            if (delayBetweenRecords > 0) {
                Thread.sleep(unitForDelay.toMillis(delayBetweenRecords));
            }
        }
    }

    /**
     *Senden Sie ein einzelnes Paar mit PutRecord an Kinesis
     */
    private void sendPair() {
        HttpReferrerPair pair = referrerFactory.create();  //Generieren Sie ein Paar von Empfehlungen
        byte[] bytes;  //Dies deklariert "Byte Typ Liste Bytes"
        try {
            bytes = JSON.writeValueAsBytes(pair);  //UTF das Paar-8 Codiertes Byte-Array und JSONized (serialisiert)
        } catch (IOException e) {
            LOG.warn("Skipping pair. Unable to serialize: '" + pair + "'", e);
            return;
        }

        PutRecordRequest putRecord = new PutRecordRequest();
        putRecord.setStreamName(streamName);
        //Durch die Verwendung von Ressourcen als Partitionsschlüssel kann die Summe der angegebenen Ressourcen genau berechnet werden.
        putRecord.setPartitionKey(pair.getResource());
        putRecord.setData(ByteBuffer.wrap(bytes));  //Unabhängig von den Bytes reservieren Sie wahrscheinlich vorerst Speicher und setzen Sie ihn?Untersuchung erforderlich
        //Senden Sie SequenceNumberForOrdering nicht, da die Reihenfolge für diese Anwendung keine Rolle spielt
        putRecord.setSequenceNumberForOrdering(null);

        try {
            kinesis.putRecord(putRecord);
        } catch (ProvisionedThroughputExceededException ex) {  //Wenn der Durchsatz überschritten wird
            //Ausgabe, wenn das Protokoll eingeschaltet ist
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Thread %s's Throughput exceeded. Waiting 10ms", Thread.currentThread().getName()));
            }
            //Warten Sie 10 Sekunden
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } catch (AmazonClientException ex) {
            LOG.warn("Error sending record to Amazon Kinesis.", ex);
        }
    }
}

Dies ist [HttpReferrerStreamWriter.java](https://github.com/aws-samples/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/samples/ Ich schreibe, indem ich aus der Hauptklasse von datavis (HttpReferrerStreamWriter.java) aufrufe. Es ist richtig geschrieben, es ist also eine lange Zeit, aber alles, was Sie tun müssen, ist, die Daten in Kinesis zu schreiben und sie dann endlos auszuführen. Obwohl ich Produzent bin, schreibe ich mit dem AWS SDK ohne KPL.

Datenkonsument

Im Gegenteil, die Seite, die Daten erfasst, wird als Datenkonsument bezeichnet. In dieser Anwendung wird es beibehalten, indem der Datenstrom für eine feste Anzahl von Sekunden erfasst und aggregiert und in Dynamo geschrieben wird (es scheint, dass die Webanwendung ihn danach visualisiert).

CountingRecordProcessor.java


package com.amazonaws.services.kinesis.samples.datavis.kcl;

import java.io.IOException;
import java.util.List;
import java.util.Map;
// concurrent:Bibliothek für Multithreading
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// kcl
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.samples.datavis.kcl.counter.SlidingWindowCounter;
import com.amazonaws.services.kinesis.samples.datavis.kcl.persistence.CountPersister;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.Clock;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.NanoClock;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.Timer;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * (HttpReferrerPair -> count(pair))Berechnung der Abbildung innerhalb einer festgelegten Zeitbreite. Die Anzahl wird während eines bestimmten Intervalls berechnet
 * 
 * @param <T>Arten von Datensätzen, die dieser Prozessor zählen kann: <>Express Generika mit(Wird beim Erstellen von Objekten verwendet, die unterschiedliche Typen als Argumente verwenden)
 */
public class CountingRecordProcessor<T> implements IRecordProcessor {  //Sie müssen die Schnittstelle in Geräten und den IRecordProcessor in KCL implementieren
    private static final Log LOG = LogFactory.getLog(CountingRecordProcessor.class);

    //Sperren, um den Timer zu verwenden
    private static final Clock NANO_CLOCK = new NanoClock();
    //Timer zum Planen von Checkpoints
    private Timer checkpointTimer = new Timer(NANO_CLOCK);

    //JSON-Objektzuordnung zum Deserialisieren von Datensätzen
    private final ObjectMapper JSON;

    //Intervall, bis die eindeutige Anzahl berechnet ist
    private int computeIntervalInMillis;
    //Gesamtzeit, die bei der Berechnung der Gesamtsumme erwartet wird
    private int computeRangeInMillis;

    //Zähler, um die Anzahl pro Intervall zu halten
    private SlidingWindowCounter<T> counter;

    //Shard, dass dieser Prozessor die Berechnung durchführt
    private String kinesisShardId;

    //Planen Sie Aktualisierungen der Anzahl mit einer festen Rate (computeIntervalInMillis) für verschiedene Threads
    private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);

    //Implementierte Zählpersistenz für jedes Intervall
    private CountPersister<T> persister;

    private CountingRecordProcessorConfig config;

    //Die Art des Datensatzes, den Sie als JSON erhalten
    private Class<T> recordType;

    /**
     *Neuen Prozessor generieren
     *
     * @param config config dieses Plattenprozessors
     * @param recordType UTF-8 Datensatztyp, der als JSON-Zeichenfolge empfangen wird
     * @param persister Anzahl, die für diesen Datensatzprozessor beibehalten wird
     * @param computeRangeInMillis Der Bereich zur Berechnung der eindeutigen Anzahl
     * @param computeIntervalInMillis Intervall zur Berechnung aller Zählungen für alle Zeiten
     */
    public CountingRecordProcessor(CountingRecordProcessorConfig config,
            Class<T> recordType,
            CountPersister<T> persister,
            int computeRangeInMillis,
            int computeIntervalInMillis) {
        if (config == null) {
            throw new NullPointerException("config must not be null");
        }
        if (recordType == null) {
            throw new NullPointerException("recordType must not be null");
        }
        if (persister == null) {
            throw new NullPointerException("persister must not be null");
        }
        if (computeRangeInMillis <= 0) {
            throw new IllegalArgumentException("computeRangeInMillis must be > 0");
        }
        if (computeIntervalInMillis <= 0) {
            throw new IllegalArgumentException("computeIntervalInMillis must be > 0");
        }
        if (computeRangeInMillis % computeIntervalInMillis != 0) {
            throw new IllegalArgumentException("compute range must be evenly divisible by compute interval to support "
                    + "accurate intervals");
        }

        this.config = config;
        this.recordType = recordType;
        this.persister = persister;
        this.computeRangeInMillis = computeRangeInMillis;
        this.computeIntervalInMillis = computeIntervalInMillis;

        //Erstellen Sie einen Objekt-Mapper für deserialisierte Datensätze, wobei Sie unbekannte Eigenschaften ignorieren
        JSON = new ObjectMapper();
        JSON.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @Override  //Überprüft auf Anmerkungen, Rechtschreibfehler usw.
    public void initialize(String shardId) {
        kinesisShardId = shardId;
        resetCheckpointAlarm();

        persister.initialize();

        //Erstellen Sie ein Schiebefenster, das groß genug ist, um den gesamten Zählbereich für jedes Intervall aufzunehmen
        counter = new SlidingWindowCounter<>((int) (computeRangeInMillis / computeIntervalInMillis));

        //Generieren Sie geplante Aufgaben, die Berechnungen durchführen und die Anzahl für jedes computeIntervalInMillis beibehalten
        scheduledExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //Synchronisieren Sie die Zähler. Hören Sie also auf, die Intervalle zu verlängern, während Sie Prüfpunkte erstellen
                synchronized (counter) {
                    try {
                        advanceOneInterval();
                    } catch (Exception ex) {
                        LOG.warn("Error advancing sliding window one interval (" + computeIntervalInMillis
                                + "ms). Skipping this interval.", ex);
                    }
                }
            }
        },
                TimeUnit.SECONDS.toMillis(config.getInitialWindowAdvanceDelayInSeconds()),
                computeIntervalInMillis,
                TimeUnit.MILLISECONDS);
    }

    /**
     *Erhöht den internen Schiebefensterzähler in Intervallen um 1 und aktiviert die Zählpersistenz, wenn das Fenster voll ist
     */
    protected void advanceOneInterval() {
        Map<T, Long> counts = null;
        synchronized (counter) {
            //Behalten Sie die Zählung nur bei, wenn Sie Daten für den gesamten Bereich halten. Es ist nicht erforderlich, jedes Teil zu Beginn des Prozesses zu zählen.
            if (shouldPersistCounts()) {
                counts = counter.getCounts();
                counter.pruneEmptyObjects();
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("We have not collected enough interval samples to calculate across the "
                            + "entire range from shard %s. Skipping this interval.", kinesisShardId));
                }
            }
            //Schieben Sie das Fenster "eine Nadel" vor
            counter.advanceWindow();
        }
        //Datenpersistenz, wenn Sie den gesamten Bereich halten
        if (counts != null) {
            persister.persist(counts);
        }
    }

    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        for (Record r : records) {
            // Deserialize each record as an UTF-8 encoded JSON String of the type provided
            //UTF jeden Datensatz für den übergebenen Typ-Deserialisieren Sie auf 8 JSON-Zeichenfolgen
            T pair;
            try {
                pair = JSON.readValue(r.getData().array(), recordType);
            } catch (IOException e) {
                LOG.warn("Skipping record. Unable to parse record into HttpReferrerPair. Partition Key: "
                        + r.getPartitionKey() + ". Sequence Number: " + r.getSequenceNumber(),
                        e);
                continue;
            }
            //Erhöhen Sie den Zähler für ein neues Paar. Synchron, da andere Threads vom Zähler lesen und die Summe in jedem Intervall berechnen.
            synchronized (counter) {
                counter.increment(pair);
            }
        }

        //Checkpoint zu diesem Zeitpunkt
        if (checkpointTimer.isTimeUp()) {
            //Sperren, um zu verhindern, dass an Kontrollpunkten zusätzliche Zählungen durchgeführt werden
            synchronized (counter) {
                checkpoint(checkpointer);
                resetCheckpointAlarm();
            }
        }
    }

    /**
     *Bevor Sie eine Zählung beibehalten können, müssen Sie in allen Fensterbereichen genügend Beispieldaten erfassen
     *
     * @return {@code true}Sammeln Sie alle Zählungen für alle Bereiche und sammeln Sie genügend Daten für die Persistenz?
     */
    private boolean shouldPersistCounts() {
        return counter.isWindowFull();
    }

    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info("Shutting down record processor for shard: " + kinesisShardId);

        scheduledExecutor.shutdown();
        try {
            //Warten Sie bis zu 30 Sekunden, bis die Executor-Serviceaufgabe abgeschlossen ist
            if (!scheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                LOG.warn("Failed to properly shut down interval thread pool for calculating interval counts and persisting them. Some counts may not have been persisted.");
            } else {
                //Checkpoint nur, wenn der Thread-Pool erfolgreich heruntergefahren wurde
                //Es ist wichtig, den Punkt nach Erreichen des Endes des Shards zu überprüfen, damit der untergeordnete Shard mit der Verarbeitung der Daten beginnen kann
                if (reason == ShutdownReason.TERMINATE) {
                    synchronized (counter) {
                        checkpoint(checkpointer);
                    }
                }
            }
        } catch (InterruptedException ie) {
            //Überprüfen Sie nicht, ob das saubere Herunterfahren fehlschlägt
            scheduledExecutor.shutdownNow();
            //Behandeln Sie diesen Fehler genauso wie Host- oder Prozessabstürze oder JVM-Abbruch
            LOG.fatal("Couldn't successfully persist data within the max wait time. Aborting the JVM to mimic a crash.");
            System.exit(1);
        }
    }

    /**
     *Stellen Sie einen Timer für den nächsten Prüfpunkt ein
     */
    private void resetCheckpointAlarm() {
        checkpointTimer.alarmIn(config.getCheckpointIntervalInSeconds(), TimeUnit.SECONDS);
    }

    /**
     *Checkpoint mit Wiederholungsversuch
     *
     * @param checkpointer
     */
    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
        LOG.info("Checkpointing shard " + kinesisShardId);
        for (int i = 0; i < config.getCheckpointRetries(); i++) {
            try {
                //Überprüfen Sie zunächst den Persister, um sicherzustellen, dass alle berechneten Zählwerte beibehalten wurden
                persister.checkpoint();
                checkpointer.checkpoint();
                return;
            } catch (ShutdownException se) {
                //Prüfpunkte ignorieren, wenn die Prozessorinstanz heruntergefahren wird (Fehler)
                LOG.info("Caught shutdown exception, skipping checkpoint.", se);
                return;
            } catch (ThrottlingException e) {
                //Vorübergehender Ausfall)In diesem Fall gehen Sie zurück und versuchen Sie es erneut
                if (i >= (config.getCheckpointRetries() - 1)) {
                    LOG.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
                    break;
                } else {
                    LOG.info("Transient issue when checkpointing - attempt " + (i + 1) + " of "
                            + config.getCheckpointRetries(),
                            e);
                }
            } catch (InvalidStateException e) {
                //Zeigt ein Problem mit DynamoDB an (prüft Tabellen und IOPS)
                LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
                break;
            } catch (InterruptedException e) {
                LOG.error("Error encountered while checkpointing count persister.", e);
                //Fehler beim erneuten Versuch
            }
            try {
                Thread.sleep(config.getCheckpointBackoffTimeInSeconds());
            } catch (InterruptedException e) {
                LOG.debug("Interrupted sleep", e);
            }
        }
        //Behandeln Sie diesen Fehler genauso wie Host- oder Prozessabstürze oder JVM-Abbruch
        LOG.fatal("Couldn't successfully persist data within max retry limit. Aborting the JVM to mimic a crash.");
        System.exit(1);
    }
}

Fortgeschritten (fortgeschritten) Für Java-Anfänger ist es etwas schwierig, aber [IRecordProcessor-Methode implementieren](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis- Die Datei-Prozessor-Implementierung-App-Java.html # kinesis-Datensatz-Prozessor-Implementierung-Schnittstelle-Java) wird in dieser Datei ausgeführt (bitte verzeihen Sie den Kommentar Japanische Übersetzung ist schwierig).

--initialize () : Initialisieren, zu verarbeitenden Shard identifizieren --processRecords () : Verarbeitet Datensätze --shutdown () : Ende der Verarbeitung

Implementiert die drei Methoden zum Angeben einer Reihe von Prozessen, und processRecord wird die Methode "checkpoint ()" übergeben, um die bereits verarbeiteten Datensätze zu verfolgen. Diese steuern, mit welchem Teil von welcher Scherbe Sie arbeiten.

Während das Fenster 2 Sekunden lang verschoben wird, scheint es tatsächlich, dass es die Top-3-Zuschauer zählt und speichert (ich verstehe diesen Teil noch nicht vollständig, zukünftige Arbeit.)

DynamoDB KCL erstellt in DynamoDB eine Tabelle, um Informationen zum Anwendungsstatus ("Checkpoint" und Shard-Worker-Korrespondenz) zu verwalten. Wenn Sie die Anwendung tatsächlich ausführen, können Sie sehen, dass die DynamoDB-Tabelle erstellt wurde. Eine dient zum Speichern des Zählergebnisses und die andere zur Tabelle für diese Statusverwaltung (KinesisDataVisSampleApp-KCLDynamoDBTable- [randomString]). Wenn Sie hineinschauen, finden Sie verschiedene Schlüssel wie "leaseKey" und "checkpoint". Diese werden verwendet, um den Status zu verwalten, in dem der Satz gelesen wird. Es gibt Beschreibung jedes Schlüssels in diesem Dokument.

Vorerst werde ich das Verständnis des Inhalts überspringen, aber es wird notwendig sein, den Shard entsprechend der Lesegeschwindigkeit zu ändern.


Ich habe bisher studiert, aber [Tutorial: Erste Schritte mit Amazon Kinesis-Datenströmen mit AWS CLI](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-tutorial- cli.html) sieht aus wie das nächste Tutorial, daher werde ich versuchen, es über die CLI zu berühren.

Recommended Posts

Kinesis-Datenströme ohne Java-Erfahrung (1)
Kinesis-Datenströme ohne Java-Erfahrung (3.1)
Kinesis-Datenströme ohne Java-Erfahrung (3.2)
Datenverarbeitung mit der Stream-API von Java 8
Verwenden Sie den PostgreSQL-Datentyp (jsonb) aus Java
Informationen zu CLDR-Gebietsschemadaten, die standardmäßig in Java 9 aktiviert sind
Rufen Sie Java von JRuby aus auf
Änderungen von Java 8 zu Java 11
Summe von Java_1 bis 100
Java Silver Passing Erfahrung
Eval Java-Quelle von Java
[Java] Datentyp ①-Basistyp
Greifen Sie über Java auf API.AI zu
Von Java zu Ruby !!
[Java] Hauptdatentypen
Java-Grunddatentypen
Java, das Daten von Android zu Jetson Nanos ROS überspringt
Holen Sie sich Wettervorhersagen von Watson Weather Company Data mit einfachem Java
CData Software Hands-on (Abrufen von Kintone-Daten aus der Java-Konsolenanwendung)
CData Software Hands-on (Twitter-Daten von der Java-Konsolenanwendung abrufen)