** Kinesis Client Library (KCL) ** ist eine von AWS erstellte Bibliothek zum Implementieren von Verbraucheranwendungen, die Daten (Datensätze) empfangen und verarbeiten, die in * AWS Kinesis Data Stream * fließen.
Kinesis-Datensätze verarbeiten
Es gibt eine der beiden Methoden, aber im ersteren Fall wird ** KCL ** verwendet.
Es ist jedoch schwer zu verstehen, wann der ** Plattenprozessor ** aufgerufen wird und was passiert, wenn der ** Plattenprozessor ** einen Fehler zurückgibt. Wenn Sie hier einen Fehler machen, Kinesis Datensätze können im Stream bleiben oder unbeabsichtigt verworfen werden.
Dieser Artikel ergänzt Elemente, die nicht im Offiziellen Handbuch zu AWS Kinesis-Datenströmen angegeben sind. Ich werde erklären, wie eine Anwendung mit ** KCL ** implementiert wird.
Beachten Sie, dass ** KCL ** Bibliotheken für * Java *, * JavaScript (Node.js) *, * Python *, * .NET * und * Ruby * enthält, in diesem Artikel jedoch * Behandelt Bibliotheken für Java * (1.x-Schnittstelle v2. Details werden später beschrieben).
Normalerweise erfordert das Abrufen eines Datensatzes aus einem Kinesis-Stream die Schritte zum Abrufen eines Shards aus dem interessierenden Stream, zum Durchlaufen der Datensätze im Shard durch einen Iterator und zum Durchlaufen des Iterators. (Weitere Informationen finden Sie unter Verbraucherentwicklung mit Kinesis Data Streams API und AWS SDK für Java (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-sdk.html). ) Sehen)
** KCL ** erledigt dies automatisch für Sie und holt den Datensatz für Sie.
Kinesis-Streams sind, wie der Name schon sagt, Streams. Im Gegensatz zu Warteschlangen können Sie abgerufene Datensätze nicht löschen. Die Aufzeichnungen bleiben im Stream, bis eine bestimmte Zeit verstrichen ist.
Wenn Sie sich nicht erinnern, wie weit die Anwendung die Datensätze im Stream verarbeitet hat, werden alle Datensätze im Stream von Anfang an erneut verarbeitet (TRIM_HORIZON
), wenn die Anwendung gestoppt / neu gestartet wird oder die Anwendung gestoppt wird. Alle eingegangenen Datensätze werden verworfen ("LATEST").
** KCL ** verfolgt, welche Datensätze verarbeitet wurden, und setzt die Verarbeitung der noch nicht verarbeiteten Datensätze fort, wenn die Anwendung gestoppt / neu gestartet wird.
Um den Kinesis-Stream zu skalieren, erhöhen Sie die Anzahl der Shards. Wenn Sie den Datensatzerfassungsprozess jedoch wie oben beschrieben selbst implementieren, müssen Sie einen Mechanismus implementieren, um zu erkennen, dass die Shards zugenommen (verringert) haben. Ich werde.
** KCL ** erkennt die Zunahme oder Abnahme von Shards im Kinesis-Stream und beginnt automatisch mit der Verarbeitung der neu geöffneten Shards.
Kinesis-Streams stellen sicher, dass Datensätze mit demselben Partitionsschlüssel immer zum selben Shard fließen. (Setzen Sie den Partitionsschlüssel auf eine eindeutige Kennung für den Datensatz, z. B. "Bestellnummer".)
Dies bedeutet, dass verwandte Datensätze nicht über mehrere Shards verteilt sind, sodass jeder Shard vollständig unabhängig voneinander parallel verarbeitet werden kann. (Vielmehr macht es keinen Sinn, die Scherben anders zu erhöhen.)
** KCL ** verarbeitet jeden erkannten Shard parallel in einem separaten Thread.
** KCL ** -Anwendungen verarbeiten normalerweise alle Shards des Ziel-Kinesis-Streams in einem Prozess. Mit zunehmender Anzahl von Shards steigt jedoch der Grad der Parallelität und ein Prozess kann nicht damit umgehen. Es gibt eine Möglichkeit.
In diesem Fall können Sie mehrere KCL-Anwendungen (Prozesse) für denselben Kinesis-Stream starten, um die Last auf die Prozesse zu verteilen.
** KCL für Java ** hat derzeit die folgenden Versionen: Dieser Artikel befasst sich mit ** "Modul Version 1.x Schnittstelle Version v2" **.
Modulversion | Schnittstellenversion | Paket | Bemerkungen |
---|---|---|---|
1.x [GitHub] | v1 | com.amazonaws.services.kinesis.clientlibrary.interfaces |
Wird auch als "Originalschnittstelle" bezeichnet. |
〃 | v2 | com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 |
Dies ist die in diesem Artikel behandelte Version |
2.x [GitHub] | - | software.amazon.kinesis |
UmdaserweiterteFanoutzunutzenVerwendenSiedieseVersionErforderlich |
Der Unterschied zwischen den Schnittstellenversionen * v1 * und * v2 * der Modulversion * 1.x * ist gering,
Es sind zwei Punkte.
Die Modulversionen * 1.x * und * 2.x * unterscheiden sich nicht wesentlich in der Verwendung des implementierenden ** Datensatzprozessors **, aber die API wurde neu gestaltet und ist nicht kompatibel.
Ein Handler zum Verarbeiten von Datensätzen, die durch den Kinesis-Stream fließen. Die Basisschnittstelle definiert drei Methoden: "Initialisierungsverarbeitung", "Datensatzverarbeitung" und "Beendigungsverarbeitung". Anwendungsentwickler müssen diese drei Methoden implementieren.
Der ** Datensatzprozessor ** wird in einer KCL-Anwendung mit einer Eins-zu-Eins-Korrespondenz mit Kinesis-Stream-Shards instanziiert und ist für die Verarbeitung der durch jeden Shard fließenden Datensätze verantwortlich.
Zeigt an, wie weit die Datensätze im Kinesis-Stream (genauer gesagt im Shard) verarbeitet wurden. Checkpoints werden nicht automatisch aufgezeichnet und müssen vom Anwendungsentwickler im ** Record Processor ** entsprechend aufgezeichnet werden. (Die Komponente zum Aufzeichnen von Checkpoints heißt ** Checkpointer **)
** Checkpoints ** werden in * DynamoDB * aufgezeichnet. Wenn Sie die KCL-Anwendung ausführen, erstellt ** KCL ** automatisch eine Tabelle in * DynamoDB *.
Die Verwaltung des Prozessorlebenszyklus (Generierung / Beendigung) wird gemäß der Anzahl der Shards im Kinesis-Stream durchgeführt. Der Anwendungsentwickler muss einen ** Worker ** mit den erforderlichen Parametern generieren und starten (z. B. welchen Kinesis-Stream verarbeitet werden soll).
In der KCL-Anwendung gibt es nur einen ** Arbeiter **.
Die folgenden Schritte sind erforderlich, um eine Anwendung mit ** KCL ** zu entwickeln.
Beispielcode (Java / Scala / Kotlin) ist auf GitHub verfügbar.
Der AWS-Mitarbeiter veröffentlicht auch Beispielcode (Java).
Implementieren Sie zunächst einen ** Datensatzprozessor **, um Kinesis-Datensätze zu verarbeiten.
Der Plattenprozessor ist threadsicher. Jede Methode des Datensatzprozessors kann nicht von mehreren Threads aufgerufen werden. Daher kann der Datensatzprozessor die erforderlichen Informationen als Instanzvariable haben.
Implementiert die Schnittstelle "com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor".
Implementieren Sie die anfängliche Verarbeitungsmethode "void initialize (InitializationInput)".
Wenn Sie über die zur Verarbeitung des Datensatzes erforderlichen Ressourcen verfügen, initialisieren Sie diese hier.
Über den Parameter InitializationInput
können Sie die ID des Shards abrufen, für den dieser Datensatzprozessor verantwortlich ist. Bewahren Sie ihn daher bei Bedarf in einer Instanzvariablen auf.
Implementieren Sie die Datensatzverarbeitungsmethode "void processRecords (ProcessRecordsInput)".
Verarbeitet Datensätze, die von Kinesis-Streams empfangen wurden.
Eine Liste der empfangenen Datensätze erhalten Sie über den Parameter ProcessRecordsInput
.
Zeichnen Sie nach erfolgreicher Verarbeitung des Datensatzes den Prüfpunkt mit dem ** Checkpointer ** auf, der mit ProcessRecordsInput # getCheckpointer ()
abgerufen werden kann.
Checkpoints werden in * DynamoDB * aufgezeichnet, sodass unter starker Last Ausnahmen wie unzureichende Kapazität auftreten können. Sie sollten es erneut versuchen, wenn eine Ausnahme auftritt, um sicherzustellen, dass Prüfpunkte aufgezeichnet werden.
Sie müssen berücksichtigen, dass derselbe Datensatz mehrmals verarbeitet werden kann, z. B. wenn Sie eine KCL-Anwendung stoppen / neu starten. Mit anderen Worten, die Verarbeitung des Datensatzes sollte ordnungsgemäß erfolgen.
Implementieren Sie die Beendigungsmethode "void shutdown (ShutdownInput)".
** Beachten Sie, dass diese Methode erst aufgerufen wird, wenn #processRecords () verarbeitet wurde. ** ** **
Geben Sie die bei der Erstverarbeitung gesicherten Ressourcen frei.
Prüfpunkte nur aufzeichnen, wenn der von "ShutdownInput # getShutdownReason ()" zurückgegebene Beendigungsgrund "TERMINATE" ist.
Checkpoints werden in * DynamoDB * aufgezeichnet, sodass unter starker Last Ausnahmen wie unzureichende Kapazität auftreten können. Sie sollten es erneut versuchen, wenn eine Ausnahme auftritt, um sicherzustellen, dass Prüfpunkte aufgezeichnet werden.
ExampleRecordProcessor
public class ExampleRecordProcessor implements IRecordProcessor {
private final String tableName;
ExampleRecordProcessor(String tableName) {
this.tableName = tableName;
}
private String shardId;
private AmazonDynamoDB dynamoDB;
private Table table;
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.getShardId();
// Initialize any resources for #processRecords().
dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
table = new Table(dynamoDB, tableName);
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
// Processing incoming records.
retry(() -> {
processRecordsInput.getRecords().forEach(record -> {
System.out.println(record);
});
});
// Record checkpoint if all incoming records processed successfully.
recordCheckpoint(processRecordsInput.getCheckpointer());
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
// Record checkpoint at closing shard if shutdown reason is TERMINATE.
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
recordCheckpoint(shutdownInput.getCheckpointer());
}
// Cleanup initialized resources.
Optional.ofNullable(dynamoDB).ifPresent(AmazonDynamoDB::shutdown);
}
private void recordCheckpoint(IRecordProcessorCheckpointer checkpointer) {
retry(() -> {
try {
checkpointer.checkpoint();
} catch (Throwable e) {
throw new RuntimeException("Record checkpoint failed.", e);
}
});
}
private void retry(Runnable f) {
try {
f.run();
} catch (Throwable e) {
System.out.println(String.format("An error occurred %s. That will be retry...", e.getMessage()));
try {
Thread.sleep(3000);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
retry(f);
}
}
Implementieren Sie dann die ** Record Processor Factory **, die zum Generieren des Record Processors verwendet wird. ** Worker ** verwendet diese Factory, um einen Datensatzprozessor zu generieren.
Implementieren Sie die Schnittstelle "com.amazonaws.services.kinesis.clientlibrary.interfaces.v2. IRecordProcessorFactory".
Implementieren Sie die Erstellungsmethode des Datensatzprozessors "IRecordProcessor createProcessor ()".
Erzeugt und gibt eine Instanz des oben implementierten ** Datensatzprozessors ** zurück.
ExampleRecordProcessorFactory
public class ExampleRecordProcessorFactory implements IRecordProcessorFactory {
private final String tableName;
ExampleRecordProcessorFactory(String tableName) {
this.tableName = tableName;
}
@Override
public IRecordProcessor createProcessor() {
return new ExampleRecordProcessor(tableName);
}
}
Erstellen Sie ab dem Einstiegspunkt (* Main * -Klasse) der KCL-Anwendung einen ** Worker ** und starten Sie ihn.
Verwenden Sie com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder
, um * ** Arbeiter ** zu generieren.
# build ()
.Die Lese- / Schreibkapazität der DynamoDB-Tabelle zum Aufzeichnen von Prüfpunkten, die automatisch von * ** KCL ** erstellt wird, ist standardmäßig "10".
Wenn Sie es ändern möchten, können Sie es mit KinesisClientLibConfiguration
angeben.
Worker # run ()
Startet den generierten ** Worker **.App
public class App {
public static void main(String... args) {
// Create a Worker.
final Worker worker = new Worker.Builder()
.recordProcessorFactory(
new ExampleRecordProcessorFactory("examples-table")
)
.config(
new KinesisClientLibConfiguration(
"kcl-java-example",
"kcl-sample",
DefaultAWSCredentialsProviderChain.getInstance(),
generateWorkerId()
).withRegionName("us-east-1")
.withInitialLeaseTableReadCapacity(1)
.withInitialLeaseTableWriteCapacity(1)
)
.build();
// Start the worker.
worker.run();
}
private static String generateWorkerId() {
try {
return InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
} catch (UnknownHostException e) {
throw new RuntimeException("Could not generate worker ID.", e);
}
}
}
Rufen Sie "Worker # startGracefulShutdown ()" auf, um den gestarteten ** Worker ** sicher zu stoppen (wenn sich ein Datensatz im Prozess befindet, wird die Verarbeitung beendet und ein Prüfpunkt vor dem Stoppen aufgezeichnet).
Normalerweise ein Java VM-Shutdown-Hook ([Runtime # addShutdownHook ()](https://docs.oracle.com/javase/jp/9/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang. Durch Aufrufen von Thread-)) können Sie den ** Worker ** am Ende des JVM-Prozesses in Ihrer KCL-Anwendung sicher stoppen.
App
final Worker worker = ...;
// Shutdown worker gracefully using shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
worker.startGracefulShutdown().get();
} catch (Throwable e) {
e.printStackTrace();
}
}));
Wenn Sie ** Arbeiter ** sicher herunterfahren möchten, müssen Sie auch einen Datensatzprozessor implementieren, um sicher herunterzufahren.
** Zusätzlich implementiert die Schnittstelle com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
zum Datensatzprozessor **.
Implementieren Sie die Beendigungsanforderungsmethode "void shutdownRequested (IRecordProcessorCheckpointer)".
** Beachten Sie, dass diese Methode erst aufgerufen wird, wenn #processRecords () verarbeitet wurde. ** ** **
Kontrollpunkte aufzeichnen.
Nach dieser Methode wird auch die oben erwähnte Beendigungsverarbeitungsmethode "#shutdown ()" aufgerufen. Der Grund für die Beendigung ist jedoch "ZOMBIE". Daher muss in dieser Methode ein Prüfpunkt aufgezeichnet werden.
Checkpoints werden in * DynamoDB * aufgezeichnet, sodass unter starker Last Ausnahmen wie unzureichende Kapazität auftreten können. Sie sollten es erneut versuchen, wenn eine Ausnahme auftritt, um sicherzustellen, dass Prüfpunkte aufgezeichnet werden.
ExampleRecordProcessor
public class ExampleRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
:
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
// Record checkpoint at graceful shutdown.
recordCheckpoint(checkpointer);
}
}
+-------------------+
| Waiting on Parent | +------------------+
+----+ Shard | | Shutdown |
| | | +--------------------+ Notification |
| +----------+--------+ | Shutdown: | Requested |
| | Success | Requested +-+-------+--------+
| | | | |
| +------+-------------+ | | | Shutdown:
| | Initializing +-----+ | | Requested
| | | | | |
| | +-----+-------+ | |
| +---------+----------+ | | Shutdown: | +-----+-------------+
| | Success | | Terminated | | Shutdown |
| | | | Zombie | | Notification +-------------+
| +------+-------------+ | | | | Complete | |
| | Processing +--+ | | ++-----------+------+ |
| +---+ | | | | | |
| | | +----------+ | | | Shutdown: |
| | +------+-------------+ | \ / | Requested |
| | | | \/ +--------------------+
| | | | ||
| | Success | | || Shutdown:
| +----------+ | || Terminated
| | || Zombie
| | ||
| | ||
| | +---++--------------+
| | | Shutting Down |
| +-----------+ |
| | |
| +--------+----------+
| |
| | Shutdown:
| | All Reasons
| |
| |
| Shutdown: +--------+----------+
| All Reasons | Shutdown |
+-------------------------------------------------------+ Complete |
| |
+-------------------+
1.Wenn der Arbeiter anfängt | 2.Beim Empfang einer Aufzeichnung | 3.Bei Scherbe SCHLIESSEN (* 1) | 4.Wenn die Scherbe geöffnet ist (* 1) | 5.Wenn ein Arbeiter sicher angehalten wird | |
---|---|---|---|---|---|
IRecordProcessor#initialize() |
① Sicherung von Ressourcen usw. | - | - | ① Sicherung von Ressourcen usw. | - |
IRecordProcessor#processRecords() |
- | ① Verarbeitung empfangener Datensätze ② Aufzeichnung des Kontrollpunkts |
- | - | - |
IRecordProcessor#shutdown() (※2) |
- | - | ① reason=TERMINATE Checkpoint-Aufzeichnung |
- | ② reason=ZOMBIE |
IShutdownNotificationAware#shutdownRequested() (※2) |
- | - | - | - | ① Aufzeichnung des Kontrollpunkts |
Das Verhalten, wenn von jeder vom Datensatzprozessor implementierten Methode eine Ausnahme ausgelöst wird, ist wie folgt.
Methode | Verhalten beim Auslösen einer Ausnahme |
---|---|
IRecordProcessor#initialize() |
Es wird weiterhin wiederholt aufgerufen, bis es normal zurückkehrt. |
IRecordProcessor#processRecords() |
Das Fehlerprotokoll wurde ausgegeben und als Argument übergebenDatensätze überspringenGetan werden. |
IRecordProcessor#shutdown() |
Es wird weiterhin wiederholt aufgerufen, bis es normal zurückkehrt. |
IShutdownNotificationAware#shutdownRequested() |
Es wird weiterhin wiederholt aufgerufen, bis es normal zurückkehrt. |
Wenn bei der Datensatzverarbeitung (# processRecords ()
) des ** Datensatzprozessors ** ein Fehler auftritt, besteht die Grundidee von ** KCL ** darin, den Datensatz zu überspringen und mit dem nächsten Datensatz fortzufahren. ..
Wenn Sie Datensätze aufgrund von Anwendungsanforderungen nicht überspringen möchten (z. B. wenn Datensätze in der richtigen Reihenfolge verarbeitet werden müssen), verfügt ** KCL ** nicht über einen Mechanismus. Implementieren Sie ihn daher selbst. wird gebraucht.
Wenn # processRecords ()
auf einen Fehler stößt, wird der Prozess weiterhin wiederholt, ohne dass eine Ausnahme zurückgegeben oder ausgelöst wird.
Wenn der Fehler vorübergehend ist (z. B. eine vorübergehende AWS-Unterbrechung), wird er durch einen erneuten Versuch automatisch wiederhergestellt.
Wenn der Fehler dauerhaft ist (z. B. ein unerwarteter Datensatz), wird er nicht automatisch wiederhergestellt und versucht es erneut.
Wenn Sie in einen Zustand geraten, in dem Sie es erneut versuchen, wird der Datensatz nicht mehr weitergeschaltet (Datensätze bleiben erhalten). Sie müssen den Datensatz daher verarbeiten, indem Sie die Anwendung ändern.
Wenn der ** Worker ** so implementiert ist, dass er beim Stoppen einer KCL-Anwendung, die sich in einem Wiederholungszustand befindet, sicher gestoppt wird, versucht der Datensatzprozessor weiterhin mit "# processRecords ()", also " #shutdownRequested () und
#shutdown ()` werden nicht aufgerufen und warten auf die Beendigung, aber das Timeout für das Herunterfahren des Hooks zwingt die JVM zum Beenden.
(Das heißt, der Datensatzprozessor zeichnet den Prüfpunkt nicht auf und wird ab dem nächsten fehlerhaften Datensatz erneut verarbeitet.)
Die Protokollausgabe von ** KCL ** kann von den folgenden Protokollierern gesteuert werden.
Logger | Erläuterung |
---|---|
com.amazonaws.services.kinesis.clientlibrary |
NormalerweiseINFO 。KCLWenn Sie das Debug-Protokoll von ausgeben möchtenDEBUG 。 |
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker |
INFO Wenn Sie Folgendes tunSleeping... Weil es ärgerlich ist, weil das Protokoll sagtWARN Es ist gut, es zu behalten. |
Die Lese- / Schreibkapazität der DynamoDB-Tabelle zum Aufzeichnen von Prüfpunkten ist unverändert auf den Anfangswert (Standardwert "10") festgelegt. Mit zunehmender Anzahl der im Kinesis-Stream fließenden Datensätze steigt jedoch der Prüfpunkt. Auf Tabellen wird häufiger zugegriffen, und die Kapazität kann knapp werden. Wenn Ihnen die Kapazität ausgeht, werden Fehler beim Aufzeichnen von Prüfpunkten angezeigt. Daher müssen Sie Ihre Kapazität erhöhen. Wenn die Kapazität nicht ausreicht, erhöhen Sie die Kapazität manuell oder skalieren Sie sie automatisch.
Die Prüfpunkttabelle wird nicht automatisch gelöscht. Sie müssen die Prüfpunkttabelle daher manuell löschen, wenn Sie die KCL-Anwendung beenden.
Offizieller AWS-Leitfaden
Kinesis Client Library Consumer Development in Java ・ ・ ・ Anweisungen zur Verwendung von ** KCL für Java **.
Statusverfolgung ・ ・ ・ Erläuterung der einzelnen Elemente in der Prüfpunkttabelle.
Resharding, Erweiterung, Parallelverarbeitung ・ ・ ・ Anzahl der Shards im Kinesis-Stream Erläuterung des Verhaltens bei Änderung.
Duplicate Record Handling-Consumer Retry (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-duplicates.html#kinesis-record-processor-duplicates -consumer) ・ ・ ・ Erläuterung, dass derselbe Datensatz in der KCL-Anwendung möglicherweise mehrmals verarbeitet wird.
Fehler beim Wiederherstellen von Amazon Kinesis-Datenströmen ・ ・ ・ KCL-Anwendung Beschreibung der Fehlerbehandlung.
[Entwicklung von Verbrauchern in Java mit erweitertem Fanout der Kinesis Client Library 2.x](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/building-enhanced-consumers- kcl-java.html) ・ ・ ・ Erläuterung zur Verwendung des erweiterten Fanouts von Kinesis.
[(BDT403) Best Practices zum Erstellen von Echtzeit-Streaming-Anwendungen mit Amazon Kinesis](https://www.slideshare.net/AmazonWebServices/bdt403-best-practices-for-building-realtime-streaming-applications-with-amazon -kinesis / 15) P ・ ・ Ab P15 gibt es Erklärungen wie Vorsichtsmaßnahmen für die Implementierung von KCL-Anwendungen.
Recommended Posts