Kinesis-Datenströme ohne Java-Erfahrung (3.2)

Fortsetzung auf Tutorial: Echtzeitanalyse von Bestandsdaten mithilfe von Kinesis-Datenströmen.

Der Lernmodul-1-Zweig zum Lernen

// TODO: Implement method

Es gibt einen Teil, in dem die Zähne fehlen. Basierend auf diesem Zweig (es scheint, dass er jetzt nicht erstellt werden kann) werden wir mit dem Lernen fortfahren, während wir ihn mit dem Hauptzweig vergleichen.

Herstellerimplementierung

Beginnen Sie mit Schritt 4: Implementieren Sie den Producer (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-producer.html). Teil, der Datensätze mit dem AWS SDK für Java schreibt (KPL wird nicht verwendet). Wie im Tutorial beschrieben, ist die Vorgehensweise

  1. Geben Sie den Streamnamen und den Regionsnamen ein
  2. Erstellen Sie ClientBuilder
  3. Legen Sie Region, Anmeldeinformationen und Client-Konfiguration fest
  4. Konfigurieren Sie den Kinesis-Client über Client Builder
  5. Überprüfen Sie den Status des Streams
  6. Senden Sie alle 100 Millisekunden zufällige Transaktionen zum Streamen

Getan werden. Was fehlt Zähne

Nur.

writer.StockTradesWriter.SendStockTrade()

/**
 *Senden Sie Bestands-Transaktionsinformationen mit dem Kinesis-Client an einen bestimmten Stream
 *
 * @param trade Instanz, die eine Aktientransaktion darstellt
 * @param kinesisClient Kinesis-Client
 * @param streamName Streamname
 */
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient,
            String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // Jackson(JSON-Bibliothek)Es wurde die Möglichkeit behoben, dass Bytes null werden, wenn sie nicht funktionieren
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }

    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    //Verwenden Sie das Tickersymbol für den Partitionsschlüssel
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}

Verbraucherimplementierung

Fahren Sie dann mit Schritt 5: Implementieren von Verbrauchern fort (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-consumer.html). Was fehlt Zähne

Rufen Sie diese zum Verständnis von [process.StockTradeProcessor] auf (https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/). Ich werde aus der Klasse amazonaws / services / kinesis / samples / stocktrades / processor / StockTradesProcessor.java) folgen.

processor.StockTradeProcessor

Extrahieren wir den wichtigen Teil der main () Methode

public static void main(String[] args) throws Exception {

    ...

    //KCL-Einstellungen
    KinesisClientLibConfiguration kclConfig =
            new KinesisClientLibConfiguration(applicationName, streamName, credentialsProvider, workerId)
        .withRegionName(region.getName())
        .withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent());
    
    //Klassenfactory für die IRecordProcessor-Schnittstelle
    IRecordProcessorFactory recordProcessorFactory = new StockTradeRecordProcessorFactory();

    //Einen Arbeiter erstellen
    Worker worker = new Worker(recordProcessorFactory, kclConfig);

    int exitCode = 0;
    try {
        //Arbeiter läuft
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.", t);
        exitCode = 1;
    }
    System.exit(exitCode);

}

Die Ausführung von KCL kann grob aufgeteilt werden.

  1. ** Durch KinesisClientLibConfiguration festlegen **
  2. ** Erstellen Sie eine Funktion (Factory), die eine Klasse zurückgibt, die die IRecordProcessor-Schnittstelle implementiert. **
  3. ** Erstellen und Ausführen eines Workers mit 1 und 2 als Argumenten **

Es gibt 3 Schritte. Die zweite Klasse, die Klasse mit der RecordProcessor-Schnittstelle und ihren Fabriken (in diesem Fall "StockTradeRecordProcessor" und "StockTradeRecordProcessorFactory"), unterliegt der Benutzerimplementierung.

processor.StockTradeRecordProcessor Es war leer [processor.StockTradeRecordProcessor.reportStats ()](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services /kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L88), [process.StockTradeRecordProcessor.resetStats ()](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning- Modul-1 / src / com / amazonaws / services / kinesis / samples / stocktrades / processor / StockTradeRecordProcessor.java # L92), [process.StockTradeRecordProcessor.processRecord ()](https://github.com/aws-samples Diese Klasse hat /amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services/kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L96).

StockTradeRecordProcessor.java


public class StockTradeRecordProcessor implements IRecordProcessor {

    ...

    /**
     *Initialisierungsmethode, die die von der Instanz zu verarbeitende Shard-ID empfängt
     */
    @Override
    public void initialize(String shardId) {
        LOG.info("Initializing record processor for shard: " + shardId);
        this.kinesisShardId = shardId;
        nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
        nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
    }

    /**
     *Methode zum Verarbeiten des abgerufenen Datensatzes
     */
    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        for (Record record : records) {
            //Prozessaufzeichnung
            processRecord(record);
        }

        //Melden Sie, wann der für das Intervall festgelegte Zeitraum abgelaufen ist
        if (System.currentTimeMillis() > nextReportingTimeInMillis) {
            reportStats();
            resetStats();
            nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
        }

        //Prüfpunkt jedes Prüfpunktintervall
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
    }
    private void reportStats() {
        // TODO: Implement method
        System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                stockStats + "\n" +
                "****************************************************************\n");
    }
    private void resetStats() {
        // TODO: Implement method
        stockStats = new StockStats();
    }
    private void processRecord(Record record) {
        // TODO: Implement method
        //Datensatz vom Byte-Array zum Objekt
        StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
        if (trade == null) {
            LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
            return;
        }
        stockStats.addStockTrade(trade);
    }

    /**
     *Methode, die aufgerufen wird, wenn die Verarbeitung abgeschlossen ist oder keine Antwort erfolgt
     */
    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info("Shutting down record processor for shard: " + kinesisShardId);
        //Sie können mit der Verarbeitung von Daten aus untergeordneten Shards beginnen, indem Sie den Kontrollpunkt markieren, wenn Sie das Ende des Shards erreichen.
        if (reason == ShutdownReason.TERMINATE) {
            checkpoint(checkpointer);
        }
    }

    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
        LOG.info("Checkpointing shard " + kinesisShardId);
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException se) {
            //Prüfpunkt beim Herunterfahren der Prozessorinstanz ignorieren (Fehler)
            LOG.info("Caught shutdown exception, skipping checkpoint.", se);
        } catch (ThrottlingException e) {
            //Überprüfen Sie nicht, ob eine Drosselung vorliegt.Betrachten Sie Backoff und versuchen Sie es im tatsächlichen Betrieb erneut.
            LOG.error("Caught throttling exception, skipping checkpoint.", e);
        } catch (InvalidStateException e) {
            //Fehler, wenn ein Problem mit DynamoDB vorliegt
            LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
        }
    }

}

Diese Klasse wird implementiert, um die "IRecordProcessor" -Schnittstelle zu erfüllen. Für "IRecordProcessor"

public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) 

(* Die in diesem Lernprogramm verwendete Benutzeroberfläche ist Version 1 und jetzt komplexer Version 2. Es gibt die neueste / dev / kinesis-Datensatz-Prozessor-Implementierung-App-java.html # kcl-java-interface-v2)).

Von diesen war die in "processRecords ()" verwendete Methode das Ziel des Tutorials. Selbst eine solch einfache Anwendung kann mithilfe von KCL einfach implementiert werden, indem Sie einfach mit processRecords () herumspielen. Ebenfalls

Wenn ein neuer Datensatz verfügbar wird, ruft KCL den Datensatz ab und ruft den Datensatzprozessor auf, sodass Sie sich keine Gedanken darüber machen müssen, wie der Datensatz aus Kinesis Data Streams abgerufen werden soll. Sie müssen sich auch keine Gedanken über die Anzahl der Shards oder Consumer-Instanzen machen. Wenn der Stream vergrößert wird, müssen Sie Ihre Anwendung nicht neu schreiben, um mehrere Shards oder Consumer-Instanzen zu verarbeiten.

Es gibt einen Verdienst, der sagte.

Ergänzung

Behandlung der empfangenen Liste

Dieses Tutorial behandelt:

for (Record record : records) {
    processRecord(record);
}
...
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());

Die Klasse "Record" selbst ist in der [AWS SDK-Dokumentation] beschrieben ((https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html)). Ja. Wenn Sie getData () aufrufen, wird die Klasse ByteBuffer zurückgegeben Da es kommen wird, geben Sie es mit array () in ein Byte-Array aus FromJsonAsBytes Konvertiert es in ein Jackson-Objekt, damit es verarbeitet werden kann. (Intern Jacksons [readValue ()](https://fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#readValue(byte [],%) Ich lese 20java.lang.Class)).)

Über Checkpoint

Es zeigt einen bestimmten Punkt im Stream an. ([Details der in DynamoDB geschriebenen Daten](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-ddb.html#kinesis-record-processor-ddb- Tabelleninhalt) war leicht zu verstehen) Der "Checkpointer", der jedes Mal vom Worker übergeben wird, wenn "processRecords ()" aufgerufen wird, gibt an, wo der Worker gerade verarbeitet (Code wahrscheinlich dieser Bereich kinesis-client / blob / v1.x / src / main / java / com / amazonaws / services / kinesis / clientlibrary / lib / worker / RecordProcessorCheckpointer.java)). In diesem Lernprogramm wird die Verarbeitungsposition alle angegebenen Sekunden angegeben. Es ist wahrscheinlich, dass unterschiedliche Strategien für den vollständigen Einsatz angewendet werden.


Damit sind die drei Tutorials abgeschlossen, und Sie können lernen, wie Sie die PutRecord-API im AWS SDK für Java verwenden und wie Sie KCL einfach verwenden. Obwohl ich die Gliederung verstehen konnte, hatte ich den Eindruck, dass die detaillierte Verwendung nicht allein aus dem Tutorial gelernt werden konnte und dass die Dokumentation und der Quellcode selbst aufgerufen werden mussten.

Es scheint lange her zu sein, dass dieses Tutorial selbst geschrieben wurde, und Sie können sehen, dass die verwendete Benutzeroberfläche und Version ebenfalls alt sind. Beim nächsten Mal werde ich versuchen, verschiedene Stellen gemäß diesem Tutorial zu aktualisieren.

Recommended Posts

Kinesis-Datenströme ohne Java-Erfahrung (1)
Kinesis-Datenströme ohne Java-Erfahrung (3.2)
Datenverarbeitung mit der Stream-API von Java 8
Verwenden Sie den PostgreSQL-Datentyp (jsonb) aus Java
Informationen zu CLDR-Gebietsschemadaten, die standardmäßig in Java 9 aktiviert sind
Rufen Sie Java von JRuby aus auf
[Java] Datentyp ①-Basistyp
Greifen Sie über Java auf API.AI zu
Von Java zu Ruby !!
[Java] Hauptdatentypen
Java-Grunddatentypen
Java, das Daten von Android zu Jetson Nanos ROS überspringt
Holen Sie sich Wettervorhersagen von Watson Weather Company Data mit einfachem Java
CData Software Hands-on (Abrufen von Kintone-Daten aus der Java-Konsolenanwendung)
CData Software Hands-on (Twitter-Daten von der Java-Konsolenanwendung abrufen)