Suite sur Tutoriel: Analyse en temps réel des données boursières à l'aide de Kinesis Data Streams.
La branche learning-module-1 pour l'apprentissage
// TODO: Implement method
Il y a une partie où les dents manquent. Sur la base de cette branche (il semble qu'elle ne peut pas être construite maintenant), nous allons procéder à l'apprentissage en la comparant avec la branche master.
Commencez par l'étape 4: implémentez le producteur (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-producer.html). Pièce qui écrit des enregistrements à l'aide du kit AWS SDK pour Java (KPL n'est pas utilisé). Comme décrit dans le didacticiel, la procédure est
Sera fait. Quelles sont les dents manquantes
Seulement.
writer.StockTradesWriter.SendStockTrade()
/**
*Envoyer des informations de transaction boursière à un flux donné à l'aide du client Kinesis
*
* @Param trade Instance représentant une transaction boursière
* @param kinesisClient client Kinesis
* @param streamName Nom du flux
*/
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient,
String streamName) {
byte[] bytes = trade.toJsonAsBytes();
// Jackson(Bibliothèque JSON)Résolution de la possibilité que les octets deviennent nuls lorsqu'ils ne fonctionnaient pas
if (bytes == null) {
LOG.warn("Could not get JSON bytes for stock trade");
return;
}
LOG.info("Putting trade: " + trade.toString());
PutRecordRequest putRecord = new PutRecordRequest();
putRecord.setStreamName(streamName);
//Utiliser le symbole boursier pour la clé de partition
putRecord.setPartitionKey(trade.getTickerSymbol());
putRecord.setData(ByteBuffer.wrap(bytes));
try {
kinesisClient.putRecord(putRecord);
} catch (AmazonClientException ex) {
LOG.warn("Error sending record to Amazon Kinesis.", ex);
}
}
Puisque l'API PutRecord écrit dans un tableau d'octets, l'objet représentant les informations de transaction boursière est [toJsonAsBytes ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/master/src Convertir en tableau d'octets en utilisant /com/amazonaws/services/kinesis/samples/stocktrades/model/StockTrade.java#L80) (en interne [writeValueAsBytes ()
](https: /) de jackson.databind.ObjectMapper
Utilisez /fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#writeValueAsBytes(java.lang.Object))
[Symbole boursier] comme clé de partition (https://ja.wikipedia.org/wiki/%E3%83%86%E3%82%A3%E3%83%83%E3%82%AB%E3%83% BC% E3% 82% B7% E3% 83% B3% E3% 83% 9C% E3% 83% AB) est utilisé. Selon le didacticiel, des centaines ou des milliers de clés de partition sont un guide pour une partition (Comment déterminer la partition de stockage article précédent leo-mon / items / 45602438bb3e9ad220ca #% E5% 87% BA% E5% 8A% 9B% E3% 83% 91% E3% 83% A9% E3% 83% A1% E3% 83% BC% E3% 82% BF % E3% 81% AB% E3% 81% A4% E3% 81% 84% E3% 81% A6))
Correction des échecs d'écriture tels que la limite d'écriture par partition, la limite d'appel d'API et l'erreur de connexion NW (cette fois, placez-le simplement entre try ... catch)
Cette fois, j'ai utilisé l'API PutRecord, mais si un grand nombre d'enregistrements sont générés, envisagez d'utiliser l'API PutRecords pour envoyer plusieurs enregistrements à la fois (pour plus de détails, [Ajouter des données au flux](https :: //docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-producers-with-sdk.html#kinesis-using-sdk-java-add-data-to-stream)
Passez ensuite à l'étape 5: implémenter les consommateurs (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-consumer.html). Quelles sont les dents manquantes
processor.StockTradeRecordProcessor.reportStats()
processor.StockTradeRecordProcessor.resetStats()
processor.StockTradeRecordProcessor.processRecord()
Appelez-les pour comprendre [processor.StockTradeProcessor
](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/ Je suivrai de la classe amazonaws / services / kinesis / samples / stocktrades / processor / StockTradesProcessor.java).
Extrayons la partie importante de la méthode main ()
public static void main(String[] args) throws Exception {
...
//Paramètres KCL
KinesisClientLibConfiguration kclConfig =
new KinesisClientLibConfiguration(applicationName, streamName, credentialsProvider, workerId)
.withRegionName(region.getName())
.withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent());
//Usine de classe pour l'interface IRecordProcessor
IRecordProcessorFactory recordProcessorFactory = new StockTradeRecordProcessorFactory();
//Créer un travailleur
Worker worker = new Worker(recordProcessorFactory, kclConfig);
int exitCode = 0;
try {
//ouvrier en cours d'exécution
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
System.exit(exitCode);
}
L'exécution de KCL peut être grossièrement divisée.
Il y a 3 étapes. Le second d'entre eux, la classe avec l'interface RecordProcessor et ses usines (dans ce cas «StockTradeRecordProcessor» et «StockTradeRecordProcessorFactory»), sont soumis à l'implémentation par l'utilisateur.
processor.StockTradeRecordProcessor
Il était vide [processor.StockTradeRecordProcessor.reportStats ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services /kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L88), [processor.StockTradeRecordProcessor.resetStats ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning- module-1 / src / com / amazonaws / services / kinesis / samples / stocktrades / processor / StockTradeRecordProcessor.java # L92), [processor.StockTradeRecordProcessor.processRecord ()
](https://github.com/aws-samples Cette classe a /amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services/kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L96).
StockTradeRecordProcessor.java
public class StockTradeRecordProcessor implements IRecordProcessor {
...
/**
*Méthode d'initialisation qui reçoit l'ID de partition à traiter par l'instance
*/
@Override
public void initialize(String shardId) {
LOG.info("Initializing record processor for shard: " + shardId);
this.kinesisShardId = shardId;
nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
/**
*Méthode de traitement de l'enregistrement récupéré
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
for (Record record : records) {
//Enregistrement de processus
processRecord(record);
}
//Signaler lorsque la période définie pour l'intervalle s'est écoulée
if (System.currentTimeMillis() > nextReportingTimeInMillis) {
reportStats();
resetStats();
nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
}
//point de contrôle à chaque intervalle de point de contrôle
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
private void reportStats() {
// TODO: Implement method
System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
stockStats + "\n" +
"****************************************************************\n");
}
private void resetStats() {
// TODO: Implement method
stockStats = new StockStats();
}
private void processRecord(Record record) {
// TODO: Implement method
//Enregistrer du tableau d'octets à l'objet
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
if (trade == null) {
LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
return;
}
stockStats.addStockTrade(trade);
}
/**
*Méthode appelée lorsque le traitement est terminé ou qu'il n'y a pas de réponse
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
LOG.info("Shutting down record processor for shard: " + kinesisShardId);
//Vous pouvez commencer à traiter les données des fragments enfants par point de contrôle lorsque vous atteignez la fin du fragment.
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
LOG.info("Checkpointing shard " + kinesisShardId);
try {
checkpointer.checkpoint();
} catch (ShutdownException se) {
//Ignorer le point de contrôle lorsque l'instance de processeur est arrêtée (échec)
LOG.info("Caught shutdown exception, skipping checkpoint.", se);
} catch (ThrottlingException e) {
//Ne pas vérifier s'il y a une limitation.Envisagez une interruption et réessayez en fonctionnement réel.
LOG.error("Caught throttling exception, skipping checkpoint.", e);
} catch (InvalidStateException e) {
//Erreur s'il y a un problème avec DynamoDB
LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
}
}
}
Cette classe est implémentée pour satisfaire l'interface de ʻIRecordProcessor. ʻIRecordProcessor
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
(* L'interface utilisée dans ce tutoriel est la version 1, et maintenant elle est plus sophistiquée version 2. Il y a le dernier / dev / kinesis-record-processor-implementation-app-java.html # kcl-java-interface-v2)).
Parmi ceux-ci, la méthode utilisée dans processRecords ()
était la cible du tutoriel.
Même une application aussi simple peut être facilement implémentée en utilisant KCL en jouant simplement avec processRecords ()
. Aussi
Lorsqu'un nouvel enregistrement devient disponible, KCL récupère l'enregistrement et appelle le processeur d'enregistrement afin que vous n'ayez pas à vous soucier de la manière de récupérer l'enregistrement à partir de Kinesis Data Streams. Vous n'avez pas non plus à vous soucier du nombre de fragments ou d'instances de consommateurs. Au fur et à mesure que le flux évolue, vous n'avez pas à réécrire votre application pour gérer plusieurs fragments ou instances de consommateurs.
Il y a un mérite à dire.
Liste <Enregistrement>
`reçueCe tutoriel traite:
for (Record record : records) {
processRecord(record);
}
...
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
La classe Record
elle-même est décrite dans la documentation du kit SDK AWS. Oui.
L'appel de getData ()
renvoie la classe ByteBuffer
Puisqu'il viendra, affichez-le dans un tableau d'octets avec [ʻarray () ](https://docs.oracle.com/javase/6/docs/api/java/nio/ByteBuffer.html#array ()) et il [
FromJsonAsBytes](https://github.com/aws-samples/amazon-kinesis-learning/blob/master/src/com/amazonaws/services/kinesis/samples/stocktrades/model/StockTrade.java#L88 ) Le convertit en objet Jackson afin qu'il puisse être manipulé. (En interne Jackson's [
readValue ()`](https://fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#readValue(byte [],%) Je lis 20java.lang.Class)).)
Il indique un point spécifique du flux. ([Détails des données écrites dans DynamoDB](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-ddb.html#kinesis-record-processor-ddb- table-matières) était facile à comprendre)
Le checkpointer
passé par le worker à chaque fois` processRecords () ʻest appelé indique où le worker est en train de traiter (code probablement [cette zone](https://github.com/awslabs/amazon- kinesis-client / blob / v1.x / src / main / java / com / amazonaws / services / kinesis / clientlibrary / lib / worker / RecordProcessorCheckpointer.java)). Ce didacticiel consiste à signaler la position de traitement chaque nombre de secondes spécifié. Il est probable que différentes stratégies seront adoptées pour une utilisation à grande échelle.
Cela complète les trois didacticiels et vous pouvez apprendre à utiliser l'API PutRecord dans le kit AWS SDK pour Java et à utiliser KCL facilement. Bien que j'aie pu comprendre le plan, j'ai eu l'impression que l'utilisation détaillée ne pouvait pas être tirée du didacticiel seul, et qu'il était nécessaire de consulter la documentation et le code source lui-même.
Il semble que cela fait longtemps que ce tutoriel lui-même n'a pas été écrit, et vous pouvez voir que l'interface et la version utilisées sont également anciennes. Alors la prochaine fois, j'essaierai de mettre à jour différents endroits selon ce tutoriel.
Recommended Posts