[Details] Implementierung von Consumer-Anwendungen mit der Kinesis Client Library für Java

Einführung

** Kinesis Client Library (KCL) ** ist eine von AWS erstellte Bibliothek zum Implementieren von Verbraucheranwendungen, die Daten (Datensätze) empfangen und verarbeiten, die in * AWS Kinesis Data Stream * fließen.

Kinesis-Datensätze verarbeiten

Es gibt eine der beiden Methoden, aber im ersteren Fall wird ** KCL ** verwendet.

Es ist jedoch schwer zu verstehen, wann der ** Plattenprozessor ** aufgerufen wird und was passiert, wenn der ** Plattenprozessor ** einen Fehler zurückgibt. Wenn Sie hier einen Fehler machen, Kinesis Datensätze können im Stream bleiben oder unbeabsichtigt verworfen werden.

Dieser Artikel ergänzt Elemente, die nicht im Offiziellen Handbuch zu AWS Kinesis-Datenströmen angegeben sind. Ich werde erklären, wie eine Anwendung mit ** KCL ** implementiert wird.

Beachten Sie, dass ** KCL ** Bibliotheken für * Java *, * JavaScript (Node.js) *, * Python *, * .NET * und * Ruby * enthält, in diesem Artikel jedoch * Behandelt Bibliotheken für Java * (1.x-Schnittstelle v2. Details werden später beschrieben).

Was ist die Kinesis Client Library?

Funktionen von KCL

Holen Sie sich Rekord

Normalerweise erfordert das Abrufen eines Datensatzes aus einem Kinesis-Stream die Schritte zum Abrufen eines Shards aus dem interessierenden Stream, zum Durchlaufen der Datensätze im Shard durch einen Iterator und zum Durchlaufen des Iterators. (Weitere Informationen finden Sie unter Verbraucherentwicklung mit Kinesis Data Streams API und AWS SDK für Java (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-sdk.html). ) Sehen)

** KCL ** erledigt dies automatisch für Sie und holt den Datensatz für Sie.

Verwaltung der Datensatzverarbeitungsposition

Kinesis-Streams sind, wie der Name schon sagt, Streams. Im Gegensatz zu Warteschlangen können Sie abgerufene Datensätze nicht löschen. Die Aufzeichnungen bleiben im Stream, bis eine bestimmte Zeit verstrichen ist.

Wenn Sie sich nicht erinnern, wie weit die Anwendung die Datensätze im Stream verarbeitet hat, werden alle Datensätze im Stream von Anfang an erneut verarbeitet (TRIM_HORIZON), wenn die Anwendung gestoppt / neu gestartet wird oder die Anwendung gestoppt wird. Alle eingegangenen Datensätze werden verworfen ("LATEST").

** KCL ** verfolgt, welche Datensätze verarbeitet wurden, und setzt die Verarbeitung der noch nicht verarbeiteten Datensätze fort, wenn die Anwendung gestoppt / neu gestartet wird.

Nach Scherbenzunahme / -abnahme

Um den Kinesis-Stream zu skalieren, erhöhen Sie die Anzahl der Shards. Wenn Sie den Datensatzerfassungsprozess jedoch wie oben beschrieben selbst implementieren, müssen Sie einen Mechanismus implementieren, um zu erkennen, dass die Shards zugenommen (verringert) haben. Ich werde.

** KCL ** erkennt die Zunahme oder Abnahme von Shards im Kinesis-Stream und beginnt automatisch mit der Verarbeitung der neu geöffneten Shards.

Shard-Parallelverarbeitung

Kinesis-Streams stellen sicher, dass Datensätze mit demselben Partitionsschlüssel immer zum selben Shard fließen. (Setzen Sie den Partitionsschlüssel auf eine eindeutige Kennung für den Datensatz, z. B. "Bestellnummer".)

Dies bedeutet, dass verwandte Datensätze nicht über mehrere Shards verteilt sind, sodass jeder Shard vollständig unabhängig voneinander parallel verarbeitet werden kann. (Vielmehr macht es keinen Sinn, die Scherben anders zu erhöhen.)

** KCL ** verarbeitet jeden erkannten Shard parallel in einem separaten Thread.

Verteilte Verarbeitung durch mehrere Prozesse

** KCL ** -Anwendungen verarbeiten normalerweise alle Shards des Ziel-Kinesis-Streams in einem Prozess. Mit zunehmender Anzahl von Shards steigt jedoch der Grad der Parallelität und ein Prozess kann nicht damit umgehen. Es gibt eine Möglichkeit.

In diesem Fall können Sie mehrere KCL-Anwendungen (Prozesse) für denselben Kinesis-Stream starten, um die Last auf die Prozesse zu verteilen.

KCL für Java-Version

** KCL für Java ** hat derzeit die folgenden Versionen: Dieser Artikel befasst sich mit ** "Modul Version 1.x Schnittstelle Version v2" **.

Modulversion Schnittstellenversion Paket Bemerkungen
1.x [GitHub] v1 com.amazonaws.services.kinesis.clientlibrary.interfaces Wird auch als "Originalschnittstelle" bezeichnet.
v2 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 Dies ist die in diesem Artikel behandelte Version
2.x [GitHub] - software.amazon.kinesis UmdaserweiterteFanoutzunutzenVerwendenSiedieseVersionErforderlich

Der Unterschied zwischen den Schnittstellenversionen * v1 * und * v2 * der Modulversion * 1.x * ist gering,

Es sind zwei Punkte.

Die Modulversionen * 1.x * und * 2.x * unterscheiden sich nicht wesentlich in der Verwendung des implementierenden ** Datensatzprozessors **, aber die API wurde neu gestaltet und ist nicht kompatibel.

KCL-Konzept

KCL.jpg

Prozessor aufnehmen

Ein Handler zum Verarbeiten von Datensätzen, die durch den Kinesis-Stream fließen. Die Basisschnittstelle definiert drei Methoden: "Initialisierungsverarbeitung", "Datensatzverarbeitung" und "Beendigungsverarbeitung". Anwendungsentwickler müssen diese drei Methoden implementieren.

Der ** Datensatzprozessor ** wird in einer KCL-Anwendung mit einer Eins-zu-Eins-Korrespondenz mit Kinesis-Stream-Shards instanziiert und ist für die Verarbeitung der durch jeden Shard fließenden Datensätze verantwortlich.

Kontrollpunkt

Zeigt an, wie weit die Datensätze im Kinesis-Stream (genauer gesagt im Shard) verarbeitet wurden. Checkpoints werden nicht automatisch aufgezeichnet und müssen vom Anwendungsentwickler im ** Record Processor ** entsprechend aufgezeichnet werden. (Die Komponente zum Aufzeichnen von Checkpoints heißt ** Checkpointer **)

** Checkpoints ** werden in * DynamoDB * aufgezeichnet. Wenn Sie die KCL-Anwendung ausführen, erstellt ** KCL ** automatisch eine Tabelle in * DynamoDB *.

Arbeiter

Die Verwaltung des Prozessorlebenszyklus (Generierung / Beendigung) wird gemäß der Anzahl der Shards im Kinesis-Stream durchgeführt. Der Anwendungsentwickler muss einen ** Worker ** mit den erforderlichen Parametern generieren und starten (z. B. welchen Kinesis-Stream verarbeitet werden soll).

In der KCL-Anwendung gibt es nur einen ** Arbeiter **.

Implementierung der KCL-Anwendung

Die folgenden Schritte sind erforderlich, um eine Anwendung mit ** KCL ** zu entwickeln.

Beispielcode

Beispielcode (Java / Scala / Kotlin) ist auf GitHub verfügbar.

Der AWS-Mitarbeiter veröffentlicht auch Beispielcode (Java).

Prozessorimplementierung aufzeichnen

Implementieren Sie zunächst einen ** Datensatzprozessor **, um Kinesis-Datensätze zu verarbeiten.

Der Plattenprozessor ist threadsicher. Jede Methode des Datensatzprozessors kann nicht von mehreren Threads aufgerufen werden. Daher kann der Datensatzprozessor die erforderlichen Informationen als Instanzvariable haben.

ExampleRecordProcessor


public class ExampleRecordProcessor implements IRecordProcessor {

    private final String tableName;

    ExampleRecordProcessor(String tableName) {
        this.tableName = tableName;
    }

    private String shardId;
    private AmazonDynamoDB dynamoDB;
    private Table table;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.getShardId();

        // Initialize any resources for #processRecords().
        dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
        table = new Table(dynamoDB, tableName);
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        // Processing incoming records.
        retry(() -> {
            processRecordsInput.getRecords().forEach(record -> {
                System.out.println(record);
            });
        });

        // Record checkpoint if all incoming records processed successfully.
        recordCheckpoint(processRecordsInput.getCheckpointer());
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        // Record checkpoint at closing shard if shutdown reason is TERMINATE.
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            recordCheckpoint(shutdownInput.getCheckpointer());
        }

        // Cleanup initialized resources.
        Optional.ofNullable(dynamoDB).ifPresent(AmazonDynamoDB::shutdown);
    }

    private void recordCheckpoint(IRecordProcessorCheckpointer checkpointer) {
        retry(() -> {
            try {
                checkpointer.checkpoint();
            } catch (Throwable e) {
                throw new RuntimeException("Record checkpoint failed.", e);
            }
        });
    }

    private void retry(Runnable f) {
        try {
            f.run();
        } catch (Throwable e) {
            System.out.println(String.format("An error occurred %s. That will be retry...", e.getMessage()));
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
            retry(f);
        }
    }

Implementierung der Record Processor Factory

Implementieren Sie dann die ** Record Processor Factory **, die zum Generieren des Record Processors verwendet wird. ** Worker ** verwendet diese Factory, um einen Datensatzprozessor zu generieren.

ExampleRecordProcessorFactory


public class ExampleRecordProcessorFactory implements IRecordProcessorFactory {

    private final String tableName;

    ExampleRecordProcessorFactory(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public IRecordProcessor createProcessor() {
        return new ExampleRecordProcessor(tableName);
    }
}

Erstellen und Starten von Workern

Erstellen Sie ab dem Einstiegspunkt (* Main * -Klasse) der KCL-Anwendung einen ** Worker ** und starten Sie ihn.

Verwenden Sie com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder, um * ** Arbeiter ** zu generieren.

Die Lese- / Schreibkapazität der DynamoDB-Tabelle zum Aufzeichnen von Prüfpunkten, die automatisch von * ** KCL ** erstellt wird, ist standardmäßig "10". Wenn Sie es ändern möchten, können Sie es mit KinesisClientLibConfiguration angeben.

App


public class App {

    public static void main(String... args) {

        // Create a Worker.
        final Worker worker = new Worker.Builder()
                .recordProcessorFactory(
                        new ExampleRecordProcessorFactory("examples-table")
                )
                .config(
                        new KinesisClientLibConfiguration(
                                "kcl-java-example",
                                "kcl-sample",
                                DefaultAWSCredentialsProviderChain.getInstance(),
                                generateWorkerId()
                        ).withRegionName("us-east-1")
                        .withInitialLeaseTableReadCapacity(1)
                        .withInitialLeaseTableWriteCapacity(1)
                )
                .build();

        // Start the worker.
        worker.run();
    }

    private static String generateWorkerId() {
        try {
            return InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        } catch (UnknownHostException e) {
            throw new RuntimeException("Could not generate worker ID.", e);
        }
    }

}

(Optional) Sicherer Stopp der Arbeitnehmer

Rufen Sie "Worker # startGracefulShutdown ()" auf, um den gestarteten ** Worker ** sicher zu stoppen (wenn sich ein Datensatz im Prozess befindet, wird die Verarbeitung beendet und ein Prüfpunkt vor dem Stoppen aufgezeichnet).

Normalerweise ein Java VM-Shutdown-Hook ([Runtime # addShutdownHook ()](https://docs.oracle.com/javase/jp/9/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang. Durch Aufrufen von Thread-)) können Sie den ** Worker ** am Ende des JVM-Prozesses in Ihrer KCL-Anwendung sicher stoppen.

App


final Worker worker = ...;

// Shutdown worker gracefully using shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    try {
        worker.startGracefulShutdown().get();
    } catch (Throwable e) {
        e.printStackTrace();
    }
}));

Sicherer Stopp des Plattenprozessors

Wenn Sie ** Arbeiter ** sicher herunterfahren möchten, müssen Sie auch einen Datensatzprozessor implementieren, um sicher herunterzufahren.

ExampleRecordProcessor


public class ExampleRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {

    :

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        // Record checkpoint at graceful shutdown.
        recordCheckpoint(checkpointer);
    }

}

Prozessorlebenszyklus aufzeichnen

Prozessorzustandsübergang aufzeichnen


     +-------------------+
     | Waiting on Parent |                               +------------------+
+----+       Shard       |                               |     Shutdown     |
|    |                   |          +--------------------+   Notification   |
|    +----------+--------+          |  Shutdown:         |     Requested    |
|               | Success           |   Requested        +-+-------+--------+
|               |                   |                      |       |
|        +------+-------------+     |                      |       | Shutdown:
|        |    Initializing    +-----+                      |       |  Requested
|        |                    |     |                      |       |
|        |                    +-----+-------+              |       |
|        +---------+----------+     |       | Shutdown:    | +-----+-------------+
|                  | Success        |       |  Terminated  | |     Shutdown      |
|                  |                |       |  Zombie      | |   Notification    +-------------+
|           +------+-------------+  |       |              | |     Complete      |             |
|           |     Processing     +--+       |              | ++-----------+------+             |
|       +---+                    |          |              |  |           |                    |
|       |   |                    +----------+              |  |           | Shutdown:          |
|       |   +------+-------------+          |              \  /           |  Requested         |
|       |          |                        |               \/            +--------------------+
|       |          |                        |               ||
|       | Success  |                        |               || Shutdown:
|       +----------+                        |               ||  Terminated
|                                           |               ||  Zombie
|                                           |               ||
|                                           |               ||
|                                           |           +---++--------------+
|                                           |           |   Shutting Down   |
|                                           +-----------+                   |
|                                                       |                   |
|                                                       +--------+----------+
|                                                                |
|                                                                | Shutdown:
|                                                                |  All Reasons
|                                                                |
|                                                                |
|      Shutdown:                                        +--------+----------+
|        All Reasons                                    |     Shutdown      |
+-------------------------------------------------------+     Complete      |
                                                        |                   |
                                                        +-------------------+

Wenn jede Methode des Datensatzprozessors aufgerufen wird

1.Wenn der Arbeiter anfängt 2.Beim Empfang einer Aufzeichnung 3.Bei Scherbe SCHLIESSEN (* 1) 4.Wenn die Scherbe geöffnet ist (* 1) 5.Wenn ein Arbeiter sicher angehalten wird
IRecordProcessor#initialize() ① Sicherung von Ressourcen usw. - - ① Sicherung von Ressourcen usw. -
IRecordProcessor#processRecords() - ① Verarbeitung empfangener Datensätze
② Aufzeichnung des Kontrollpunkts
- - -
IRecordProcessor#shutdown()(※2) - - reason=TERMINATECheckpoint-Aufzeichnung - reason=ZOMBIE
IShutdownNotificationAware#shutdownRequested()(※2) - - - - ① Aufzeichnung des Kontrollpunkts

Fehlerbehandlung

Prozessorfehlerbehandlung aufzeichnen

Das Verhalten, wenn von jeder vom Datensatzprozessor implementierten Methode eine Ausnahme ausgelöst wird, ist wie folgt.

Methode Verhalten beim Auslösen einer Ausnahme
IRecordProcessor#initialize() Es wird weiterhin wiederholt aufgerufen, bis es normal zurückkehrt.
IRecordProcessor#processRecords() Das Fehlerprotokoll wurde ausgegeben und als Argument übergebenDatensätze überspringenGetan werden.
IRecordProcessor#shutdown() Es wird weiterhin wiederholt aufgerufen, bis es normal zurückkehrt.
IShutdownNotificationAware#shutdownRequested() Es wird weiterhin wiederholt aufgerufen, bis es normal zurückkehrt.

Wenn Sie nicht überspringen möchten, wenn bei der Verarbeitung des Datensatzes ein Fehler auftritt

Wenn bei der Datensatzverarbeitung (# processRecords ()) des ** Datensatzprozessors ** ein Fehler auftritt, besteht die Grundidee von ** KCL ** darin, den Datensatz zu überspringen und mit dem nächsten Datensatz fortzufahren. ..

Wenn Sie Datensätze aufgrund von Anwendungsanforderungen nicht überspringen möchten (z. B. wenn Datensätze in der richtigen Reihenfolge verarbeitet werden müssen), verfügt ** KCL ** nicht über einen Mechanismus. Implementieren Sie ihn daher selbst. wird gebraucht.

Betrieb der KCL-Anwendung

Protokollierung

Die Protokollausgabe von ** KCL ** kann von den folgenden Protokollierern gesteuert werden.

Logger Erläuterung
com.amazonaws.services.kinesis.clientlibrary NormalerweiseINFOKCLWenn Sie das Debug-Protokoll von ausgeben möchtenDEBUG
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker INFOWenn Sie Folgendes tunSleeping...Weil es ärgerlich ist, weil das Protokoll sagtWARNEs ist gut, es zu behalten.

Checkpoint-Tabellenverwaltung

Verweise

Recommended Posts

[Details] Implementierung von Consumer-Anwendungen mit der Kinesis Client Library für Java
[AWS x Java] Dynamo DB-Streams mit Kinesis Client Library (KCL) verarbeiten
Implementierung der Klonmethode für Java Record
Rufen Sie eine Liste mit MBean-Informationen für Java-Anwendungen ab
CI die Architektur von Java / Kotlin-Anwendungen mit ArchUnit
Überprüfung und Implementierung der CSV-Bibliothek zum Laden großer Datenmengen in MySQL (Java)
[Java] Vereinfachen Sie die Implementierung der Datenverlaufsverwaltung mit Reladomo
Implementierung eines mathematischen Syntaxanalysators durch rekursive absteigende Syntaxanalysemethode (Java)
[Java] Beispielprojekt zum Entwickeln von Webanwendungen mit Spring Boot
[Java] Implementierung des Faistel-Netzwerks
Implementierung von XLPagerTabStrip mit TabBarController
Java-Implementierung von Tri-Tree
Beispiel für die Verwendung der Bulk-API von Salesforce vom Java-Client mit PK-Chunking
[Code Pipeline x Elastic Beanstalk] Zusammenfassung der Fehler und Gegenmaßnahmen für CI / CD-Java-Anwendungen zu Elastic Beanstalk mit Code Pipeline
[Für Anfänger] Zusammenfassung des Java-Konstruktors
[Java EE] Implementieren Sie den Client mit WebSocket
Verwandeln Sie Java-Anwendungen mit Jib ganz einfach in Docker
Aktivieren Sie OpenCV mit Java8. (Für mich)
Kotlin-Generika für Java-Entwickler
Implementierung einer ähnlichen Funktion in Java
Holen Sie sich eine Liste der S3-Dateien mit ListObjectsV2Request (AWS SDK für Java)
Grundlagen der Java-Programmierung - Ich möchte ein Dreieck mit einer for-Anweisung ① anzeigen
Grundlagen der Java-Programmierung - Ich möchte ein Dreieck mit einer for-Anweisung ② anzeigen