Fortsetzung auf Tutorial: Echtzeitanalyse von Bestandsdaten mithilfe von Kinesis-Datenströmen.
Der Lernmodul-1-Zweig zum Lernen
// TODO: Implement method
Es gibt einen Teil, in dem die Zähne fehlen. Basierend auf diesem Zweig (es scheint, dass er jetzt nicht erstellt werden kann) werden wir mit dem Lernen fortfahren, während wir ihn mit dem Hauptzweig vergleichen.
Beginnen Sie mit Schritt 4: Implementieren Sie den Producer (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-producer.html). Teil, der Datensätze mit dem AWS SDK für Java schreibt (KPL wird nicht verwendet). Wie im Tutorial beschrieben, ist die Vorgehensweise
Getan werden. Was fehlt Zähne
Nur.
writer.StockTradesWriter.SendStockTrade()
/**
*Senden Sie Bestands-Transaktionsinformationen mit dem Kinesis-Client an einen bestimmten Stream
*
* @param trade Instanz, die eine Aktientransaktion darstellt
* @param kinesisClient Kinesis-Client
* @param streamName Streamname
*/
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient,
String streamName) {
byte[] bytes = trade.toJsonAsBytes();
// Jackson(JSON-Bibliothek)Es wurde die Möglichkeit behoben, dass Bytes null werden, wenn sie nicht funktionieren
if (bytes == null) {
LOG.warn("Could not get JSON bytes for stock trade");
return;
}
LOG.info("Putting trade: " + trade.toString());
PutRecordRequest putRecord = new PutRecordRequest();
putRecord.setStreamName(streamName);
//Verwenden Sie das Tickersymbol für den Partitionsschlüssel
putRecord.setPartitionKey(trade.getTickerSymbol());
putRecord.setData(ByteBuffer.wrap(bytes));
try {
kinesisClient.putRecord(putRecord);
} catch (AmazonClientException ex) {
LOG.warn("Error sending record to Amazon Kinesis.", ex);
}
}
Da die PutRecord-API in ein Byte-Array schreibt, lautet das Objekt, das Informationen zu Börsentransaktionen darstellt, [toJsonAsBytes ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/master/src Konvertieren Sie mit /com/amazonaws/services/kinesis/samples/stocktrades/model/StockTrade.java#L80 in ein Byte-Array (intern [writeValueAsBytes ()
](https: /) von jackson.databind.ObjectMapper
Verwenden Sie /fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#writeValueAsBytes(java.lang.Object))
[Tickersymbol] als Partitionsschlüssel (https://ja.wikipedia.org/wiki/%E3%83%86%E3%82%A3%E3%83%83%E3%82%AB%E3%83% BC% E3% 82% B7% E3% 83% B3% E3% 83% 9C% E3% 83% AB) wird verwendet. Laut dem Tutorial sind Hunderte oder Tausende von Partitionsschlüsseln eine Richtlinie für einen Shard (So bestimmen Sie den Shard des Speicherziels vorheriger Artikel leo-mon / items / 45602438bb3e9ad220ca #% E5% 87% BA% E5% 8A% 9B% E3% 83% 91% E3% 83% A9% E3% 83% A1% E3% 83% BC% E3% 82% BF % E3% 81% AB% E3% 81% A4% E3% 81% 84% E3% 81% A6))
Behobene Schreibfehler wie Schreiblimit pro Shard, API-Aufruflimit und NW-Verbindungsfehler (diesmal einfach zwischen try ... catch)
Dieses Mal habe ich die PutRecord-API verwendet. Wenn jedoch eine große Anzahl von Datensätzen generiert wird, sollten Sie die PutRecords-API verwenden, um mehrere Datensätze gleichzeitig zu senden (Einzelheiten finden Sie unter [Daten zum Stream hinzufügen](https ::)). (/docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-producers-with-sdk.html#kinesis-using-sdk-java-add-data-to-stream))
Fahren Sie dann mit Schritt 5: Implementieren von Verbrauchern fort (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-consumer.html). Was fehlt Zähne
processor.StockTradeRecordProcessor.reportStats()
processor.StockTradeRecordProcessor.resetStats()
processor.StockTradeRecordProcessor.processRecord()
Rufen Sie diese zum Verständnis von [process.StockTradeProcessor
] auf (https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/). Ich werde aus der Klasse amazonaws / services / kinesis / samples / stocktrades / processor / StockTradesProcessor.java) folgen.
Extrahieren wir den wichtigen Teil der main ()
Methode
public static void main(String[] args) throws Exception {
...
//KCL-Einstellungen
KinesisClientLibConfiguration kclConfig =
new KinesisClientLibConfiguration(applicationName, streamName, credentialsProvider, workerId)
.withRegionName(region.getName())
.withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent());
//Klassenfactory für die IRecordProcessor-Schnittstelle
IRecordProcessorFactory recordProcessorFactory = new StockTradeRecordProcessorFactory();
//Einen Arbeiter erstellen
Worker worker = new Worker(recordProcessorFactory, kclConfig);
int exitCode = 0;
try {
//Arbeiter läuft
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
System.exit(exitCode);
}
Die Ausführung von KCL kann grob aufgeteilt werden.
Es gibt 3 Schritte. Die zweite Klasse, die Klasse mit der RecordProcessor-Schnittstelle und ihren Fabriken (in diesem Fall "StockTradeRecordProcessor" und "StockTradeRecordProcessorFactory"), unterliegt der Benutzerimplementierung.
processor.StockTradeRecordProcessor
Es war leer [processor.StockTradeRecordProcessor.reportStats ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services /kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L88), [process.StockTradeRecordProcessor.resetStats ()
](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning- Modul-1 / src / com / amazonaws / services / kinesis / samples / stocktrades / processor / StockTradeRecordProcessor.java # L92), [process.StockTradeRecordProcessor.processRecord ()
](https://github.com/aws-samples Diese Klasse hat /amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services/kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L96).
StockTradeRecordProcessor.java
public class StockTradeRecordProcessor implements IRecordProcessor {
...
/**
*Initialisierungsmethode, die die von der Instanz zu verarbeitende Shard-ID empfängt
*/
@Override
public void initialize(String shardId) {
LOG.info("Initializing record processor for shard: " + shardId);
this.kinesisShardId = shardId;
nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
/**
*Methode zum Verarbeiten des abgerufenen Datensatzes
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
for (Record record : records) {
//Prozessaufzeichnung
processRecord(record);
}
//Melden Sie, wann der für das Intervall festgelegte Zeitraum abgelaufen ist
if (System.currentTimeMillis() > nextReportingTimeInMillis) {
reportStats();
resetStats();
nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
}
//Prüfpunkt jedes Prüfpunktintervall
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
private void reportStats() {
// TODO: Implement method
System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
stockStats + "\n" +
"****************************************************************\n");
}
private void resetStats() {
// TODO: Implement method
stockStats = new StockStats();
}
private void processRecord(Record record) {
// TODO: Implement method
//Datensatz vom Byte-Array zum Objekt
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
if (trade == null) {
LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
return;
}
stockStats.addStockTrade(trade);
}
/**
*Methode, die aufgerufen wird, wenn die Verarbeitung abgeschlossen ist oder keine Antwort erfolgt
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
LOG.info("Shutting down record processor for shard: " + kinesisShardId);
//Sie können mit der Verarbeitung von Daten aus untergeordneten Shards beginnen, indem Sie den Kontrollpunkt markieren, wenn Sie das Ende des Shards erreichen.
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
LOG.info("Checkpointing shard " + kinesisShardId);
try {
checkpointer.checkpoint();
} catch (ShutdownException se) {
//Prüfpunkt beim Herunterfahren der Prozessorinstanz ignorieren (Fehler)
LOG.info("Caught shutdown exception, skipping checkpoint.", se);
} catch (ThrottlingException e) {
//Überprüfen Sie nicht, ob eine Drosselung vorliegt.Betrachten Sie Backoff und versuchen Sie es im tatsächlichen Betrieb erneut.
LOG.error("Caught throttling exception, skipping checkpoint.", e);
} catch (InvalidStateException e) {
//Fehler, wenn ein Problem mit DynamoDB vorliegt
LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
}
}
}
Diese Klasse wird implementiert, um die "IRecordProcessor" -Schnittstelle zu erfüllen. Für "IRecordProcessor"
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
(* Die in diesem Lernprogramm verwendete Benutzeroberfläche ist Version 1 und jetzt komplexer Version 2. Es gibt die neueste / dev / kinesis-Datensatz-Prozessor-Implementierung-App-java.html # kcl-java-interface-v2)).
Von diesen war die in "processRecords ()" verwendete Methode das Ziel des Tutorials.
Selbst eine solch einfache Anwendung kann mithilfe von KCL einfach implementiert werden, indem Sie einfach mit processRecords ()
herumspielen. Ebenfalls
Wenn ein neuer Datensatz verfügbar wird, ruft KCL den Datensatz ab und ruft den Datensatzprozessor auf, sodass Sie sich keine Gedanken darüber machen müssen, wie der Datensatz aus Kinesis Data Streams abgerufen werden soll. Sie müssen sich auch keine Gedanken über die Anzahl der Shards oder Consumer-Instanzen machen. Wenn der Stream vergrößert wird, müssen Sie Ihre Anwendung nicht neu schreiben, um mehrere Shards oder Consumer-Instanzen zu verarbeiten.
Es gibt einen Verdienst, der sagte.
Dieses Tutorial behandelt:
for (Record record : records) {
processRecord(record);
}
...
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
Die Klasse "Record" selbst ist in der [AWS SDK-Dokumentation] beschrieben ((https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html)). Ja.
Wenn Sie getData () aufrufen, wird die Klasse ByteBuffer
zurückgegeben Da es kommen wird, geben Sie es mit array ()
in ein Byte-Array aus FromJsonAsBytes
Konvertiert es in ein Jackson-Objekt, damit es verarbeitet werden kann. (Intern Jacksons [readValue ()
](https://fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#readValue(byte [],%) Ich lese 20java.lang.Class)).)
Es zeigt einen bestimmten Punkt im Stream an. ([Details der in DynamoDB geschriebenen Daten](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-ddb.html#kinesis-record-processor-ddb- Tabelleninhalt) war leicht zu verstehen) Der "Checkpointer", der jedes Mal vom Worker übergeben wird, wenn "processRecords ()" aufgerufen wird, gibt an, wo der Worker gerade verarbeitet (Code wahrscheinlich dieser Bereich kinesis-client / blob / v1.x / src / main / java / com / amazonaws / services / kinesis / clientlibrary / lib / worker / RecordProcessorCheckpointer.java)). In diesem Lernprogramm wird die Verarbeitungsposition alle angegebenen Sekunden angegeben. Es ist wahrscheinlich, dass unterschiedliche Strategien für den vollständigen Einsatz angewendet werden.
Damit sind die drei Tutorials abgeschlossen, und Sie können lernen, wie Sie die PutRecord-API im AWS SDK für Java verwenden und wie Sie KCL einfach verwenden. Obwohl ich die Gliederung verstehen konnte, hatte ich den Eindruck, dass die detaillierte Verwendung nicht allein aus dem Tutorial gelernt werden konnte und dass die Dokumentation und der Quellcode selbst aufgerufen werden mussten.
Es scheint lange her zu sein, dass dieses Tutorial selbst geschrieben wurde, und Sie können sehen, dass die verwendete Benutzeroberfläche und Version ebenfalls alt sind. Beim nächsten Mal werde ich versuchen, verschiedene Stellen gemäß diesem Tutorial zu aktualisieren.
Recommended Posts