** Kinesis Client Library (KCL) ** est une bibliothèque conçue par AWS pour implémenter des applications grand public qui reçoivent et traitent des données (enregistrements) circulant dans * AWS Kinesis Data Stream *.
Pour traiter les enregistrements Kinesis
Il existe l'une ou l'autre des méthodes, mais ** KCL ** est utilisé dans le premier cas.
Cependant, il est difficile de comprendre quand le ** processeur d'enregistrement ** est appelé et ce qui se passe lorsque le ** processeur d'enregistrement ** renvoie une erreur. Si vous faites une erreur ici, Kinesis Les enregistrements peuvent rester dans le flux ou être supprimés par inadvertance.
Cet article complète les éléments non spécifiés dans le Guide officiel des flux de données AWS Kinesis. Je vais vous expliquer comment implémenter une application utilisant ** KCL **.
Notez que ** KCL ** a des bibliothèques pour * Java *, * JavaScript (Node.js) *, * Python *, * .NET * et * Ruby *, mais dans cet article, * Gère les bibliothèques pour Java * (interface 1.x v2. Les détails seront décrits plus loin).
Normalement, obtenir un enregistrement à partir d'un flux Kinesis nécessite les étapes consistant à obtenir un fragment du flux d'intérêt, à obtenir un itérateur pour parcourir les enregistrements dans le fragment et à parcourir l'itérateur. (Pour plus de détails, consultez Développement grand public à l'aide de l'API Kinesis Data Streams et du kit SDK AWS pour Java (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-sdk.html) ) Voir)
** KCL ** le fera automatiquement pour vous et obtiendra l'enregistrement pour vous.
Les flux Kinesis, comme leur nom l'indique, sont des flux, donc contrairement aux files d'attente, vous ne pouvez pas supprimer les enregistrements récupérés. Les enregistrements resteront dans le flux jusqu'à ce qu'un certain temps se soit écoulé.
Par conséquent, si vous ne vous souvenez pas à quel point l'application a traité les enregistrements dans le flux, tous les enregistrements du flux seront traités à nouveau depuis le début (TRIM_HORIZON
) lorsque l'application est arrêtée / redémarrée, ou l'application sera arrêtée. Tous les enregistrements qui ont afflué seront supprimés («DERNIER»).
** KCL ** garde une trace des enregistrements qui ont été traités, et lorsque l'application est arrêtée / redémarrée, il reprend le traitement à partir des enregistrements qui n'ont pas encore été traités.
Pour mettre à l'échelle le flux Kinesis, augmentez le nombre de fragments, mais si vous implémentez vous-même le processus d'acquisition d'enregistrements comme décrit ci-dessus, vous devrez implémenter un mécanisme pour détecter que les fragments ont augmenté (diminué). Je vais.
** KCL ** détecte l'augmentation ou la diminution des partitions dans le flux Kinesis et démarre automatiquement le traitement des partitions nouvellement ouvertes.
Les flux Kinesis garantissent que les enregistrements avec la même clé de partition sont acheminés vers la même partition. (Définissez la clé de partition sur un identifiant unique pour l'enregistrement, par exemple "numéro de commande")
Cela signifie que les enregistrements associés ne sont pas dispersés sur plusieurs fragments, de sorte que chaque fragment peut être traité en parallèle de manière totalement indépendante. (Au contraire, cela n'a pas de sens d'augmenter les fragments autrement)
** KCL ** traite chaque partition détectée en parallèle dans un thread distinct.
Les applications ** KCL ** gèrent généralement toutes les partitions du flux Kinesis cible en un seul processus, mais à mesure que le nombre de partitions augmente, le degré de parallélisme augmente et un processus ne peut pas le gérer. Il y a une possibilité.
Dans ce cas, vous pouvez lancer plusieurs applications (processus) KCL pour le même flux Kinesis afin de répartir la charge entre les processus.
** KCL pour Java ** a actuellement les versions suivantes: Cet article traite de ** "Module version 1.x interface version v2" **.
Version du module | Version de l'interface | paquet | Remarques |
---|---|---|---|
1.x [GitHub] | v1 | com.amazonaws.services.kinesis.clientlibrary.interfaces |
Aussi appelé "interface d'origine" |
〃 | v2 | com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 |
Ceci est la version couverte dans cet article |
2.x [GitHub] | - | software.amazon.kinesis |
PourprofiterdelarépartitionétendueUtilisezcetteversionNécessaire |
La différence entre les versions d'interface * v1 * et * v2 * de la version du module * 1.x * est légère,
C'est deux points.
Les versions de module * 1.x * et * 2.x * ne diffèrent pas de manière significative dans l'utilisation de l'implémentation de ** record processor **, mais l'API a été repensée et n'est pas compatible.
Un gestionnaire pour le traitement des enregistrements circulant dans le flux Kinesis. L'interface de base définit trois méthodes, «traitement d'initialisation», «traitement d'enregistrement» et «traitement de terminaison», et les développeurs d'applications doivent implémenter ces trois méthodes.
Le ** Processeur d'enregistrement ** est instancié dans une application KCL avec une correspondance biunivoque avec les fragments de flux Kinesis et est responsable du traitement des enregistrements circulant dans chaque fragment.
Indique dans quelle mesure les enregistrements du flux Kinesis (plus précisément, dans la partition) ont été traités. Les points de contrôle ne sont pas enregistrés automatiquement et doivent être correctement enregistrés par le développeur de l'application dans le ** Processeur d'enregistrement **. (Le composant d'enregistrement des points de contrôle s'appelle ** checkpointer **)
** Les points de contrôle ** sont enregistrés dans * DynamoDB *. Lorsque vous exécutez l'application KCL, ** KCL ** crée automatiquement une table dans * DynamoDB *.
La gestion du cycle de vie du processeur d'enregistrements (génération / arrêt) est effectuée en fonction du nombre de fragments dans le flux Kinesis. Le développeur de l'application doit générer et démarrer un ** worker ** avec les paramètres requis (tels que le flux Kinesis à traiter).
Il n'y a qu'un seul ** worker ** dans l'application KCL.
Tout d'abord, implémentez un ** processeur d'enregistrement ** pour traiter les enregistrements Kinesis.
Le processeur d'enregistrement est thread-safe. Chaque méthode du processeur d'enregistrement ne peut pas être appelée par plusieurs threads. Par conséquent, le processeur d'enregistrement peut avoir les informations nécessaires en tant que variable d'instance.
Implémente l'interface com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
.
Implémentez la méthode de traitement initial void initialize (InitializationInput)
.
Si vous disposez des ressources nécessaires pour traiter l'enregistrement, initialisez-les ici.
A partir du paramètre ʻInitializationInput`, vous pouvez obtenir l'ID de la partition dont ce processeur d'enregistrement est en charge, alors gardez-le dans une variable d'instance si nécessaire.
Implémentez la méthode de traitement des enregistrements void processRecords (ProcessRecordsInput)
.
Les enregistrements de processus reçus des flux Kinesis.
Vous pouvez obtenir une liste des enregistrements reçus à partir du paramètre ProcessRecordsInput
.
Après avoir traité avec succès l'enregistrement, enregistrez le point de contrôle en utilisant le ** checkpointer ** qui peut être obtenu avec ProcessRecordsInput # getCheckpointer ()
.
Les points de contrôle sont enregistrés dans * DynamoDB *, de sorte que des exceptions telles qu'une capacité insuffisante peuvent se produire sous une charge importante. Vous devez réessayer lorsqu'une exception se produit pour vous assurer que les points de contrôle sont enregistrés.
Vous devez tenir compte du fait que le même enregistrement peut être traité plusieurs fois, par exemple lorsque vous arrêtez / redémarrez une application KCL. En d'autres termes, le traitement de l'enregistrement doit être gracieux.
Implémentez la méthode de terminaison void shutdown (ShutdownInput)
.
** Notez que cette méthode ne sera pas appelée tant que le #processRecords () n'aura pas été traité. ** **
Libérez les ressources sécurisées lors du traitement initial.
Enregistrez les points de contrôle uniquement si la raison de la résiliation renvoyée par ShutdownInput # getShutdownReason ()
est TERMINATE
.
Les points de contrôle sont enregistrés dans * DynamoDB *, de sorte que des exceptions telles qu'une capacité insuffisante peuvent se produire sous une charge importante. Vous devez réessayer lorsqu'une exception se produit pour vous assurer que les points de contrôle sont enregistrés.
ExampleRecordProcessor
public class ExampleRecordProcessor implements IRecordProcessor {
private final String tableName;
ExampleRecordProcessor(String tableName) {
this.tableName = tableName;
}
private String shardId;
private AmazonDynamoDB dynamoDB;
private Table table;
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.getShardId();
// Initialize any resources for #processRecords().
dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
table = new Table(dynamoDB, tableName);
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
// Processing incoming records.
retry(() -> {
processRecordsInput.getRecords().forEach(record -> {
System.out.println(record);
});
});
// Record checkpoint if all incoming records processed successfully.
recordCheckpoint(processRecordsInput.getCheckpointer());
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
// Record checkpoint at closing shard if shutdown reason is TERMINATE.
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
recordCheckpoint(shutdownInput.getCheckpointer());
}
// Cleanup initialized resources.
Optional.ofNullable(dynamoDB).ifPresent(AmazonDynamoDB::shutdown);
}
private void recordCheckpoint(IRecordProcessorCheckpointer checkpointer) {
retry(() -> {
try {
checkpointer.checkpoint();
} catch (Throwable e) {
throw new RuntimeException("Record checkpoint failed.", e);
}
});
}
private void retry(Runnable f) {
try {
f.run();
} catch (Throwable e) {
System.out.println(String.format("An error occurred %s. That will be retry...", e.getMessage()));
try {
Thread.sleep(3000);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
retry(f);
}
}
Ensuite, implémentez la ** Record Processor Factory ** qui sera utilisée pour générer le processeur d'enregistrement. ** Worker ** utilise cette usine pour générer un processeur d'enregistrement.
Implémentez l'interface com.amazonaws.services.kinesis.clientlibrary.interfaces.v2. IRecordProcessorFactory
.
Implémentez la méthode de création du processeur d'enregistrement ʻIRecordProcessor createProcessor () `.
Génère et renvoie une instance du ** processeur d'enregistrement ** implémenté ci-dessus.
ExampleRecordProcessorFactory
public class ExampleRecordProcessorFactory implements IRecordProcessorFactory {
private final String tableName;
ExampleRecordProcessorFactory(String tableName) {
this.tableName = tableName;
}
@Override
public IRecordProcessor createProcessor() {
return new ExampleRecordProcessor(tableName);
}
}
Enfin, à partir du point d'entrée (classe * Main *) de l'application KCL, créez et lancez un ** worker **.
Utilisez com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder
pour générer des * ** workers **.
# build ()
.La capacité de lecture / écriture de la table DynamoDB pour l'enregistrement des points de contrôle, qui est automatiquement créée par * ** KCL **, est par défaut «10».
Si vous voulez le changer, vous pouvez le spécifier avec KinesisClientLibConfiguration
.
Worker # run ()
Démarre le ** worker ** généré.App
public class App {
public static void main(String... args) {
// Create a Worker.
final Worker worker = new Worker.Builder()
.recordProcessorFactory(
new ExampleRecordProcessorFactory("examples-table")
)
.config(
new KinesisClientLibConfiguration(
"kcl-java-example",
"kcl-sample",
DefaultAWSCredentialsProviderChain.getInstance(),
generateWorkerId()
).withRegionName("us-east-1")
.withInitialLeaseTableReadCapacity(1)
.withInitialLeaseTableWriteCapacity(1)
)
.build();
// Start the worker.
worker.run();
}
private static String generateWorkerId() {
try {
return InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
} catch (UnknownHostException e) {
throw new RuntimeException("Could not generate worker ID.", e);
}
}
}
Appelez Worker # startGracefulShutdown ()
pour arrêter en toute sécurité le ** worker ** démarré (s'il y a un enregistrement en cours, il terminera le traitement et enregistrera un point de contrôle avant de s'arrêter).
Normalement, un hook d'arrêt de machine virtuelle Java ([Runtime # addShutdownHook ()](https://docs.oracle.com/javase/jp/9/docs/api/java/lang/Runtime.html#addShutdownHook-java.lang. En appelant depuis Thread-)), vous pouvez arrêter en toute sécurité le ** worker ** à la fin du processus JVM dans votre application KCL.
App
final Worker worker = ...;
// Shutdown worker gracefully using shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
worker.startGracefulShutdown().get();
} catch (Throwable e) {
e.printStackTrace();
}
}));
Si vous souhaitez arrêter en toute sécurité ** workers **, vous devez également implémenter un processeur d'enregistrement pour arrêter en toute sécurité.
** Additional implémente l'interface com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
vers le processeur d'enregistrement **.
Implémentez la méthode de demande de résiliation void shutdownRequested (IRecordProcessorCheckpointer)
.
** Notez que cette méthode ne sera pas appelée tant que le #processRecords () n'aura pas été traité. ** **
Enregistrer les points de contrôle.
Après cette méthode, la méthode de traitement de terminaison #shutdown ()
mentionnée ci-dessus est également appelée, mais la raison de la résiliation est ZOMBIE
, donc un point de contrôle doit être enregistré dans cette méthode.
Les points de contrôle sont enregistrés dans * DynamoDB *, de sorte que des exceptions telles qu'une capacité insuffisante peuvent se produire sous une charge importante. Vous devez réessayer lorsqu'une exception se produit pour vous assurer que les points de contrôle sont enregistrés.
ExampleRecordProcessor
public class ExampleRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
:
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
// Record checkpoint at graceful shutdown.
recordCheckpoint(checkpointer);
}
}
+-------------------+
| Waiting on Parent | +------------------+
+----+ Shard | | Shutdown |
| | | +--------------------+ Notification |
| +----------+--------+ | Shutdown: | Requested |
| | Success | Requested +-+-------+--------+
| | | | |
| +------+-------------+ | | | Shutdown:
| | Initializing +-----+ | | Requested
| | | | | |
| | +-----+-------+ | |
| +---------+----------+ | | Shutdown: | +-----+-------------+
| | Success | | Terminated | | Shutdown |
| | | | Zombie | | Notification +-------------+
| +------+-------------+ | | | | Complete | |
| | Processing +--+ | | ++-----------+------+ |
| +---+ | | | | | |
| | | +----------+ | | | Shutdown: |
| | +------+-------------+ | \ / | Requested |
| | | | \/ +--------------------+
| | | | ||
| | Success | | || Shutdown:
| +----------+ | || Terminated
| | || Zombie
| | ||
| | ||
| | +---++--------------+
| | | Shutting Down |
| +-----------+ |
| | |
| +--------+----------+
| |
| | Shutdown:
| | All Reasons
| |
| |
| Shutdown: +--------+----------+
| All Reasons | Shutdown |
+-------------------------------------------------------+ Complete |
| |
+-------------------+
1.Quand le travailleur commence | 2.Lors de la réception d'un enregistrement | 3.Au fragment CLOSE (* 1) | 4.Lorsque le fragment est ouvert (* 1) | 5.Lorsqu'un travailleur est arrêté en toute sécurité | |
---|---|---|---|---|---|
IRecordProcessor#initialize() |
① Sécurisation des ressources, etc. | - | - | ① Sécurisation des ressources, etc. | - |
IRecordProcessor#processRecords() |
- | ① Traitement des enregistrements reçus ② Enregistrement du point de contrôle |
- | - | - |
IRecordProcessor#shutdown() (※2) |
- | - | ① reason=TERMINATE Enregistrement des points de contrôle |
- | ② reason=ZOMBIE |
IShutdownNotificationAware#shutdownRequested() (※2) |
- | - | - | - | ① Enregistrement du point de contrôle |
# processRecords ()
est en cours d'exécution, # shutdown ()
et # shutdownRequested ()
ne seront pas appelés tant que le traitement de la commande# processRecords ()
ne sera pas terminé.Le comportement lorsqu'une exception est levée à partir de chaque méthode implémentée par le processeur d'enregistrement est le suivant.
Méthode | Comportement lors de la levée d'une exception |
---|---|
IRecordProcessor#initialize() |
Il continuera à être appelé à plusieurs reprises jusqu'à ce qu'il revienne normalement. |
IRecordProcessor#processRecords() |
Le journal des erreurs a été généré et passé en argumentIgnorer les enregistrementsSera fait. |
IRecordProcessor#shutdown() |
Il continuera à être appelé à plusieurs reprises jusqu'à ce qu'il revienne normalement. |
IShutdownNotificationAware#shutdownRequested() |
Il continuera à être appelé à plusieurs reprises jusqu'à ce qu'il revienne normalement. |
Si une erreur se produit dans le traitement de l'enregistrement (# processRecords ()
) du ** processeur d'enregistrement **, l'idée de base de ** KCL ** est de sauter l'enregistrement et de passer à l'enregistrement suivant. ..
Si vous ne souhaitez pas ignorer les enregistrements en raison des exigences de l'application (par exemple, devoir traiter les enregistrements dans l'ordre), ** KCL ** n'a pas de mécanisme, alors implémentez-le vous-même. est nécessaire.
Si # processRecords ()
rencontre une erreur, il continuera à réessayer le processus sans retourner ni lever d'exception.
Si l'erreur est temporaire (telle qu'une interruption AWS temporaire), elle sera automatiquement récupérée par une nouvelle tentative.
Si l'erreur est permanente (comme un enregistrement inattendu), elle ne sera pas automatiquement récupérée et continuera à réessayer.
Si vous arrivez dans un état où vous continuez à réessayer, l'enregistrement ne progressera plus (les enregistrements resteront), vous devez donc traiter l'enregistrement en modifiant l'application.
Si le ** worker ** est implémenté pour s'arrêter en toute sécurité lors de l'arrêt d'une application KCL qui est dans un état de nouvelle tentative, le processeur d'enregistrement continue de réessayer avec # processRecords ()
, donc #shutdownRequested () ʻand
#shutdown () `ne seront pas appelés et attendront la fin, mais le délai d'expiration du hook d'arrêt forcera la JVM à s'arrêter.
(Autrement dit, le processeur d'enregistrement n'enregistrera pas le point de contrôle et sera retraité à partir de l'enregistrement suivant par erreur)
La sortie du journal de ** KCL ** peut être contrôlée par les enregistreurs suivants.
Enregistreur | La description |
---|---|
com.amazonaws.services.kinesis.clientlibrary |
NormalementINFO 。KCLSi vous souhaitez afficher le journal de débogage deDEBUG 。 |
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker |
INFO Si vous faites ce qui suitSleeping... Parce que c'est ennuyeux parce que le journal qui ditWARN Il est bon de le garder. |
La capacité de lecture / écriture de la table DynamoDB pour l'enregistrement des points de contrôle est fixée à la valeur initiale (par défaut «10») telle quelle, mais à mesure que le nombre d'enregistrements circulant dans le flux Kinesis augmente, le point de contrôle augmente. Les tables sont consultées plus souvent et peuvent manquer de capacité. Si vous manquez de capacité, vous obtiendrez des erreurs lors de l'enregistrement des points de contrôle, vous devrez donc augmenter votre capacité. Chaque fois que la capacité est insuffisante, augmentez manuellement la capacité ou la mise à l'échelle automatique.
La table des points de contrôle n'est pas supprimée automatiquement, vous devrez donc supprimer manuellement la table des points de contrôle lorsque vous interrompez l'application KCL.
Guide officiel AWS
Kinesis Client Library Consumer Development in Java ・ ・ ・ Instructions d'utilisation de ** KCL pour Java **.
Suivi du statut ・ ・ ・ Explication de chaque élément dans le tableau des points de contrôle.
Resharding, extension, parallel processing ・ ・ ・ Nombre de fragments dans le flux Kinesis Explication du comportement en cas de modification.
Gestion des enregistrements en double - Réessayer par le consommateur (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-duplicates.html#kinesis-record-processor-duplicates -consumer) ・ ・ ・ Explication que le même enregistrement peut être traité plusieurs fois dans l'application KCL.
Récupération après l'échec d'Amazon Kinesis Data Streams ・ ・ ・ Application KCL Description de la gestion des erreurs.
[Développement de consommateurs en Java avec une répartition améliorée de Kinesis Client Library 2.x](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/building-enhanced-consumers- kcl-java.html) ・ ・ ・ Explication sur la façon d'utiliser le déploiement étendu de Kinesis.
[(BDT403) Bonnes pratiques pour la création d'applications de streaming en temps réel avec Amazon Kinesis](https://www.slideshare.net/AmazonWebServices/bdt403-best-practices-for-building-realtime-streaming-applications-with-amazon -kinesis / 15) ・ ・ ・ À partir de P15, il y a des explications telles que les précautions pour la mise en œuvre des applications KCL.
Recommended Posts