Flux de données Kinesis à partir de zéro expérience Java (3.2)

Suite sur Tutoriel: Analyse en temps réel des données boursières à l'aide de Kinesis Data Streams.

La branche learning-module-1 pour l'apprentissage

// TODO: Implement method

Il y a une partie où les dents manquent. Sur la base de cette branche (il semble qu'elle ne peut pas être construite maintenant), nous allons procéder à l'apprentissage en la comparant avec la branche master.

Implémentation producteur

Commencez par l'étape 4: implémentez le producteur (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-producer.html). Pièce qui écrit des enregistrements à l'aide du kit AWS SDK pour Java (KPL n'est pas utilisé). Comme décrit dans le didacticiel, la procédure est

  1. Entrez le nom du flux et le nom de la région
  2. Créez ClientBuilder
  3. Définissez la région, les informations d'identification et la configuration du client
  4. Configurer le client Kinesis à partir de Client Builder
  5. Vérifiez l'état du flux
  6. Envoyez des transactions aléatoires en streaming toutes les 100 millisecondes

Sera fait. Quelles sont les dents manquantes

Seulement.

writer.StockTradesWriter.SendStockTrade()

/**
 *Envoyer des informations de transaction boursière à un flux donné à l'aide du client Kinesis
 *
 * @Param trade Instance représentant une transaction boursière
 * @param kinesisClient client Kinesis
 * @param streamName Nom du flux
 */
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient,
            String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // Jackson(Bibliothèque JSON)Résolution de la possibilité que les octets deviennent nuls lorsqu'ils ne fonctionnaient pas
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }

    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    //Utiliser le symbole boursier pour la clé de partition
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}

Implémentation consommateur

Passez ensuite à l'étape 5: implémenter les consommateurs (https://docs.aws.amazon.com/ja_jp/streams/latest/dev/learning-kinesis-module-one-consumer.html). Quelles sont les dents manquantes

Appelez-les pour comprendre [processor.StockTradeProcessor](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/ Je suivrai de la classe amazonaws / services / kinesis / samples / stocktrades / processor / StockTradesProcessor.java).

processor.StockTradeProcessor

Extrayons la partie importante de la méthode main ()

public static void main(String[] args) throws Exception {

    ...

    //Paramètres KCL
    KinesisClientLibConfiguration kclConfig =
            new KinesisClientLibConfiguration(applicationName, streamName, credentialsProvider, workerId)
        .withRegionName(region.getName())
        .withCommonClientConfig(ConfigurationUtils.getClientConfigWithUserAgent());
    
    //Usine de classe pour l'interface IRecordProcessor
    IRecordProcessorFactory recordProcessorFactory = new StockTradeRecordProcessorFactory();

    //Créer un travailleur
    Worker worker = new Worker(recordProcessorFactory, kclConfig);

    int exitCode = 0;
    try {
        //ouvrier en cours d'exécution
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.", t);
        exitCode = 1;
    }
    System.exit(exitCode);

}

L'exécution de KCL peut être grossièrement divisée.

  1. ** Défini via KinesisClientLibConfiguration **
  2. ** Créez une fonction (usine) qui retourne une classe qui implémente l'interface IRecordProcessor **
  3. ** Création et exécution d'un worker avec 1 et 2 comme arguments **

Il y a 3 étapes. Le second d'entre eux, la classe avec l'interface RecordProcessor et ses usines (dans ce cas «StockTradeRecordProcessor» et «StockTradeRecordProcessorFactory»), sont soumis à l'implémentation par l'utilisateur.

processor.StockTradeRecordProcessor Il était vide [processor.StockTradeRecordProcessor.reportStats ()](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services /kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L88), [processor.StockTradeRecordProcessor.resetStats ()](https://github.com/aws-samples/amazon-kinesis-learning/blob/learning- module-1 / src / com / amazonaws / services / kinesis / samples / stocktrades / processor / StockTradeRecordProcessor.java # L92), [processor.StockTradeRecordProcessor.processRecord ()](https://github.com/aws-samples Cette classe a /amazon-kinesis-learning/blob/learning-module-1/src/com/amazonaws/services/kinesis/samples/stocktrades/processor/StockTradeRecordProcessor.java#L96).

StockTradeRecordProcessor.java


public class StockTradeRecordProcessor implements IRecordProcessor {

    ...

    /**
     *Méthode d'initialisation qui reçoit l'ID de partition à traiter par l'instance
     */
    @Override
    public void initialize(String shardId) {
        LOG.info("Initializing record processor for shard: " + shardId);
        this.kinesisShardId = shardId;
        nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
        nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
    }

    /**
     *Méthode de traitement de l'enregistrement récupéré
     */
    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        for (Record record : records) {
            //Enregistrement de processus
            processRecord(record);
        }

        //Signaler lorsque la période définie pour l'intervalle s'est écoulée
        if (System.currentTimeMillis() > nextReportingTimeInMillis) {
            reportStats();
            resetStats();
            nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
        }

        //point de contrôle à chaque intervalle de point de contrôle
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
    }
    private void reportStats() {
        // TODO: Implement method
        System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                stockStats + "\n" +
                "****************************************************************\n");
    }
    private void resetStats() {
        // TODO: Implement method
        stockStats = new StockStats();
    }
    private void processRecord(Record record) {
        // TODO: Implement method
        //Enregistrer du tableau d'octets à l'objet
        StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
        if (trade == null) {
            LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
            return;
        }
        stockStats.addStockTrade(trade);
    }

    /**
     *Méthode appelée lorsque le traitement est terminé ou qu'il n'y a pas de réponse
     */
    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info("Shutting down record processor for shard: " + kinesisShardId);
        //Vous pouvez commencer à traiter les données des fragments enfants par point de contrôle lorsque vous atteignez la fin du fragment.
        if (reason == ShutdownReason.TERMINATE) {
            checkpoint(checkpointer);
        }
    }

    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
        LOG.info("Checkpointing shard " + kinesisShardId);
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException se) {
            //Ignorer le point de contrôle lorsque l'instance de processeur est arrêtée (échec)
            LOG.info("Caught shutdown exception, skipping checkpoint.", se);
        } catch (ThrottlingException e) {
            //Ne pas vérifier s'il y a une limitation.Envisagez une interruption et réessayez en fonctionnement réel.
            LOG.error("Caught throttling exception, skipping checkpoint.", e);
        } catch (InvalidStateException e) {
            //Erreur s'il y a un problème avec DynamoDB
            LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
        }
    }

}

Cette classe est implémentée pour satisfaire l'interface de ʻIRecordProcessor. ʻIRecordProcessor

public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) 

(* L'interface utilisée dans ce tutoriel est la version 1, et maintenant elle est plus sophistiquée version 2. Il y a le dernier / dev / kinesis-record-processor-implementation-app-java.html # kcl-java-interface-v2)).

Parmi ceux-ci, la méthode utilisée dans processRecords () était la cible du tutoriel. Même une application aussi simple peut être facilement implémentée en utilisant KCL en jouant simplement avec processRecords (). Aussi

Lorsqu'un nouvel enregistrement devient disponible, KCL récupère l'enregistrement et appelle le processeur d'enregistrement afin que vous n'ayez pas à vous soucier de la manière de récupérer l'enregistrement à partir de Kinesis Data Streams. Vous n'avez pas non plus à vous soucier du nombre de fragments ou d'instances de consommateurs. Au fur et à mesure que le flux évolue, vous n'avez pas à réécrire votre application pour gérer plusieurs fragments ou instances de consommateurs.

Il y a un mérite à dire.

Supplément

Traitement de la Liste <Enregistrement> `reçue

Ce tutoriel traite:

for (Record record : records) {
    processRecord(record);
}
...
StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());

La classe Record elle-même est décrite dans la documentation du kit SDK AWS. Oui. L'appel de getData () renvoie la classe ByteBuffer Puisqu'il viendra, affichez-le dans un tableau d'octets avec [ʻarray () ](https://docs.oracle.com/javase/6/docs/api/java/nio/ByteBuffer.html#array ()) et il [FromJsonAsBytes](https://github.com/aws-samples/amazon-kinesis-learning/blob/master/src/com/amazonaws/services/kinesis/samples/stocktrades/model/StockTrade.java#L88 ) Le convertit en objet Jackson afin qu'il puisse être manipulé. (En interne Jackson's [readValue ()`](https://fasterxml.github.io/jackson-databind/javadoc/2.7/com/fasterxml/jackson/databind/ObjectMapper.html#readValue(byte [],%) Je lis 20java.lang.Class)).)

À propos du point de contrôle

Il indique un point spécifique du flux. ([Détails des données écrites dans DynamoDB](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-ddb.html#kinesis-record-processor-ddb- table-matières) était facile à comprendre) Le checkpointer passé par le worker à chaque fois` processRecords () ʻest appelé indique où le worker est en train de traiter (code probablement [cette zone](https://github.com/awslabs/amazon- kinesis-client / blob / v1.x / src / main / java / com / amazonaws / services / kinesis / clientlibrary / lib / worker / RecordProcessorCheckpointer.java)). Ce didacticiel consiste à signaler la position de traitement chaque nombre de secondes spécifié. Il est probable que différentes stratégies seront adoptées pour une utilisation à grande échelle.


Cela complète les trois didacticiels et vous pouvez apprendre à utiliser l'API PutRecord dans le kit AWS SDK pour Java et à utiliser KCL facilement. Bien que j'aie pu comprendre le plan, j'ai eu l'impression que l'utilisation détaillée ne pouvait pas être tirée du didacticiel seul, et qu'il était nécessaire de consulter la documentation et le code source lui-même.

Il semble que cela fait longtemps que ce tutoriel lui-même n'a pas été écrit, et vous pouvez voir que l'interface et la version utilisées sont également anciennes. Alors la prochaine fois, j'essaierai de mettre à jour différents endroits selon ce tutoriel.

Recommended Posts

Flux de données Kinesis à partir de zéro expérience Java (1)
Flux de données Kinesis à partir de zéro expérience Java (3.2)
Traitement des données à l'aide de l'API de flux de Java 8
Utiliser le type de données PostgreSQL (jsonb) à partir de Java
À propos des données locales CLDR activées par défaut à partir de Java 9
Appeler Java depuis JRuby
[Java] Type de données ①-Type de base
Accédez à API.AI depuis Java
De Java à Ruby !!
[Java] Principaux types de données
Types de données de base Java
Java qui ignore les données d'Android vers le ROS de Jetson Nano
Obtenez les prévisions météorologiques de Watson Weather Company Data avec Java simple
Logiciel pratique CData (obtenir des données kintone à partir de l'application console Java)
Logiciel pratique CData (obtenir des données Twitter à partir de l'application console Java)