Kinesis Data Streams from zero Java experience (1)

I have a need, so I will try to handle Kinesis in Java.

--AWS is touched, but scripting languages (Python, Javascript) are the main

A study record of an engineer.

Amazon Kinesis Data Streams Overview

img From Developer Documentation

Roughly speaking, a queue service for large amounts of data. There are other queue services such as SQS and Amazon MQ, and more specifically AWS IoT and Step Functions, but Kinesis is a service that focuses especially on large amounts of data.

--Ability increase / decrease due to shard increase / decrease --Saving records for a certain period of time --Linkage with loading and analysis services (Firehose, Analytics) --Closed access by Private Link

I think that is a big feature. further

--The library on the writing side (Kinesis Producer Library, KPL), the library on the reading side (Kinesis Client Library, KCL .com / awslabs / amazon-kinesis-client))

One of the features is that it is done. Of course, like other AWS services, it is an API call, so you can use it from the AWS CLI or AWS SDK. On the other hand, using KPL and KCL [advantages in terms of performance and cooperation between both](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-producers-with-kpl.html #developing-producers-with-kpl-advantage). This KPL and KCL are written in Java. Therefore, I decided to learn how to use it in Java. (To be precise, KPL wraps modules written in C ++ in Java Or KCL [Use Java wrapper with Python, Node, etc.](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-consumers-with-kcl.html#kinesis -record-processor-overview-kcl) is also possible)

Official tutorial

Let's try Tutorial: Visualize Web Traffic Using Amazon Kinesis Data Streams think. Basically, you can proceed by just clicking, so make a note of the points you are interested in while reading the above document.

Environment

Environment just by running this CFn template Is completed. Regarding Parameters ʻApplication Archive, the default is v1.1.1 as of July 4, 2018, but since the latest version is 1.1.2, https://github.com/aws-samples/amazon-kinesis Rewrite to -data-visualization-sample / releases / download / v1.1.2 / amazon-kinesis-data-visualization-sample-1.1.2-assembly.zip` (here You may want to check the latest version at aws-samples / amazon-kinesis-data-visualization-sample / releases)). Enter appropriate values for key pair and SSH address restrictions. The status will change to CREATE_COMPLETE in about 5 minutes, so check the CFn output from the OUTPUTS tab.

Checking the application

Check the graph

If you access the address displayed in the URL item of the Outputs tab (the visualization application is hosted on EC2), you will see the graph. It seems to be a simulation of an application in which the Web server sequentially publishes the referer of the accessing user to Kinesis, polls the result, and visualizes and aggregates it in real time.

Kinesis settings

I think that a DataStream named KinesisDataVisSampleApp has been created, so let's check each setting value. Take a look at the Details tab.

Shards

Shard is the number of divisions of the record to be written

Each shard can support up to 5 transactions per second for reads and a maximum total rate of 2 MB for data reads per second. The shard also supports up to 1,000 records per second for writes and a maximum total data write rate of 1 MB per second (including partition keys). The total capacity of the stream is the total capacity of the shard.

As stated in the official document, the performance per shard is fixed, and the performance is adjusted by increasing or decreasing this. Basically, the larger the value, the better the performance, but the partition key (described later) must be sufficiently larger than the number of shards. I think that the application created this time is set to 2.

Server-side encryption When turned on, data can be encrypted using KMS.

Data retention period The data can be retained in Kinesis and can be set in the range of 24 to 168 hours. (Of course, the longer it is, the more it will be charged)

Shard level metrics You can see the metrics at the shard level, which helps you see if the data is efficiently distributed across shards. This also incurs an additional charge.

Monitoring from the console

If you look at the Monitoring tab, you will see a graph of each metric. You only need to note that the actual values are shown in blue and the items shown in red indicate the limits at the current shard value (only the red line is shown). It's working, but it's not actually working)

Data producer

The side that writes to Kinesis is called the producer. This application simulates pushing the referer of the person who accessed it, but the address is randomly selected from 6 URLs and thrown at Kinesis.

Here There is a code on the producer side in, so I will check it and see it.

(I don't understand Java at all **, so I'll comment on it **)

HttpReferrerKinesisPutter.java


// package:Namespace dividers, apparently use owned domains to prevent conflicts with anyone in the world...
package com.amazonaws.services.kinesis.samples.datavis.producer;
// import:Similar to Python, the last (IOException on this line) is globally introduced in the namespace in this file
import java.io.IOException;  // java.The standard library starts with
import java.nio.ByteBuffer;  //Buffer (nio: non-It seems to be blocking io, I see)
import java.util.concurrent.TimeUnit;  //Time convenience operation library

import org.apache.commons.logging.Log;  // commons-A library that provides a unified logging interface, which seems to be logging
import org.apache.commons.logging.LogFactory;  //Factory used as a set with ↑

// AWS SDK for Java
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;  
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.samples.datavis.model.HttpReferrerPair;  //Referrer generator (namespace is complicated, but samples and below are in the code of this tutorial)

import com.fasterxml.jackson.databind.ObjectMapper;  // Jackson:JSON parser library for Java

/**
 *Send HTTP referrer pair to Kinesis
 */
public class HttpReferrerKinesisPutter {
    private static final Log LOG = LogFactory.getLog(HttpReferrerKinesisPutter.class);  //Log, seems to eat his own class

    private HttpReferrerPairFactory referrerFactory;  //Factory for referrer generation
    private AmazonKinesis kinesis;  // AWS SDK
    private String streamName;

    private final ObjectMapper JSON = new ObjectMapper();  //Convert Java object ↔︎ JSON through this guy

    //Write to Kinesis after receiving the referrer generation factory, AWS SDK, and the name of the destination Kinesis Stream.
    //In Java, it becomes a constructor with a method declaration that has the same name as the class name and no return value.
    public HttpReferrerKinesisPutter(HttpReferrerPairFactory pairFactory, AmazonKinesis kinesis, String streamName) {
        //Argument error handling
        if (pairFactory == null) {
            throw new IllegalArgumentException("pairFactory must not be null");
        }
        if (kinesis == null) {
            throw new IllegalArgumentException("kinesis must not be null");
        }
        if (streamName == null || streamName.isEmpty()) {
            throw new IllegalArgumentException("streamName must not be null or empty");
        }
        this.referrerFactory = pairFactory;  //Hmmm class
        this.kinesis = kinesis;
        this.streamName = streamName;
    }

    //Something like ↓ is called Javadoc@Documents can be generated by annotating with (docstring and Sphinx-like)
    /**
     *Send a fixed number of HTTP referrer pairs to Kinesis. These are sent sequentially
     *If you want throughput, multiple{@link HttpReferrerKinesisPutter}Use s
     *
     * @param n Number of pairs sent to Kinesis
     * @param delayBetweenRecords Wait time between record transmissions, ignored if less than or equal to 0
     * @param unitForDelay The unit of time used as the wait time
     *
     * @throws InterruptedException Exception when interrupted before sending the next pair
     */
    public void sendPairs(long n, long delayBetweenRecords, TimeUnit unitForDelay) throws InterruptedException {
        for (int i = 0; i < n && !Thread.currentThread().isInterrupted(); i++) {  //Get the current thread with currentThread
            sendPair();  //Send
            Thread.sleep(unitForDelay.toMillis(delayBetweenRecords));  //Wait
        }
    }

    /**
     *Send HTTP referrer pairs to Kinesis endlessly, stop only when interrupted
     *Multiple when you want throughput{@link HttpReferrerKinesisPutter}Consider using s
     *
     * @param delayBetweenRecords Wait time between record transmissions, ignored if less than or equal to 0
     * @param unitForDelay The unit of time used as the wait time
     *
     * @throws InterruptedException Exception when interrupted before sending the next pair
     */
    public void sendPairsIndefinitely(long delayBetweenRecords, TimeUnit unitForDelay) throws InterruptedException {
        while (!Thread.currentThread().isInterrupted()) {
            sendPair();
            if (delayBetweenRecords > 0) {
                Thread.sleep(unitForDelay.toMillis(delayBetweenRecords));
            }
        }
    }

    /**
     *Send a single pair to Kinesis using PutRecord
     */
    private void sendPair() {
        HttpReferrerPair pair = referrerFactory.create();  //Generate a pair of referrers
        byte[] bytes;  //This declares "byte type list bytes"
        try {
            bytes = JSON.writeValueAsBytes(pair);  //UTF the pair-8 Encoded byte array and JSON (serialized)
        } catch (IOException e) {
            LOG.warn("Skipping pair. Unable to serialize: '" + pair + "'", e);
            return;
        }

        PutRecordRequest putRecord = new PutRecordRequest();
        putRecord.setStreamName(streamName);
        //By using the resource as a partition key, the total of the given resources can be calculated accurately.
        putRecord.setPartitionKey(pair.getResource());
        putRecord.setData(ByteBuffer.wrap(bytes));  //Probably, whatever the bytes are, reserve memory for the time being and set?Investigation required
        //Do not send SequenceNumberForOrdering as the order does not matter to this application
        putRecord.setSequenceNumberForOrdering(null);

        try {
            kinesis.putRecord(putRecord);
        } catch (ProvisionedThroughputExceededException ex) {  //When throughput is exceeded
            //Output if log is ON
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Thread %s's Throughput exceeded. Waiting 10ms", Thread.currentThread().getName()));
            }
            //Wait 10 seconds
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } catch (AmazonClientException ex) {
            LOG.warn("Error sending record to Amazon Kinesis.", ex);
        }
    }
}

This is [HttpReferrerStreamWriter.java](https://github.com/aws-samples/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/samples/ I am writing by calling from the main class of datavis / HttpReferrerStreamWriter.java). It's written properly, so it's long, but all you have to do is write the data to Kinesis and then run it endlessly. Although I am a producer, I am writing using the AWS SDK without using KPL.

Data consumer

On the contrary, the side that acquires data is called a data consumer. In this application, it is persisted by acquiring and aggregating from the data stream for a fixed number of seconds and writing it to Dynamo (it seems that the Web application will visualize it after that).

This uses KCL instead of calling the API directly. It's a bit long, but [CountingRecordProcessor.java](https://github.com/aws-samples/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/ Let's take a look at samples / datavis / kcl / CountingRecordProcessor.java).

CountingRecordProcessor.java


package com.amazonaws.services.kinesis.samples.datavis.kcl;

import java.io.IOException;
import java.util.List;
import java.util.Map;
// concurrent:Library for multithreading
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// kcl
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.samples.datavis.kcl.counter.SlidingWindowCounter;
import com.amazonaws.services.kinesis.samples.datavis.kcl.persistence.CountPersister;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.Clock;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.NanoClock;
import com.amazonaws.services.kinesis.samples.datavis.kcl.timing.Timer;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * (HttpReferrerPair -> count(pair))Calculation of mapping within a fixed time width. Counts are calculated during a given interval
 * 
 * @param <T>Record types that this processor can count: <>Express generics with(Used when creating an object that takes a different type as an argument)
 */
public class CountingRecordProcessor<T> implements IRecordProcessor {  //It is necessary to implement the interface in implements and IRecordProcessor in KCL.
    private static final Log LOG = LogFactory.getLog(CountingRecordProcessor.class);

    //Lock to use the timer
    private static final Clock NANO_CLOCK = new NanoClock();
    //Timer for scheduling checkpoints
    private Timer checkpointTimer = new Timer(NANO_CLOCK);

    //JSON object mapper for deserializing records
    private final ObjectMapper JSON;

    //Interval until the distinct count is calculated
    private int computeIntervalInMillis;
    //Total time expected when calculating the total
    private int computeRangeInMillis;

    //Counter to hold the count per interval
    private SlidingWindowCounter<T> counter;

    //Shards that this processor does the calculations
    private String kinesisShardId;

    //Schedule count updates at a fixed rate (computeIntervalInMillis) for different threads
    private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);

    //Implemented count persistence for each interval
    private CountPersister<T> persister;

    private CountingRecordProcessorConfig config;

    //The type of record you will receive as JSON
    private Class<T> recordType;

    /**
     *Generate new processor
     *
     * @param config config of this record processor
     * @param recordType UTF-8 Record type that will be received as a JSON string
     * @param persister Counts that will be persisted to this record processor
     * @param computeRangeInMillis The range to calculate the distinct count
     * @param computeIntervalInMillis Interval for calculating all counts for all time
     */
    public CountingRecordProcessor(CountingRecordProcessorConfig config,
            Class<T> recordType,
            CountPersister<T> persister,
            int computeRangeInMillis,
            int computeIntervalInMillis) {
        if (config == null) {
            throw new NullPointerException("config must not be null");
        }
        if (recordType == null) {
            throw new NullPointerException("recordType must not be null");
        }
        if (persister == null) {
            throw new NullPointerException("persister must not be null");
        }
        if (computeRangeInMillis <= 0) {
            throw new IllegalArgumentException("computeRangeInMillis must be > 0");
        }
        if (computeIntervalInMillis <= 0) {
            throw new IllegalArgumentException("computeIntervalInMillis must be > 0");
        }
        if (computeRangeInMillis % computeIntervalInMillis != 0) {
            throw new IllegalArgumentException("compute range must be evenly divisible by compute interval to support "
                    + "accurate intervals");
        }

        this.config = config;
        this.recordType = recordType;
        this.persister = persister;
        this.computeRangeInMillis = computeRangeInMillis;
        this.computeIntervalInMillis = computeIntervalInMillis;

        //Create an object mapper for deserialized records, ignoring unknown properties
        JSON = new ObjectMapper();
        JSON.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @Override  //Checks annotations, implementation spelling mistakes, etc.
    public void initialize(String shardId) {
        kinesisShardId = shardId;
        resetCheckpointAlarm();

        persister.initialize();

        //Create a sliding window large enough to hold the entire range of counts for each interval
        counter = new SlidingWindowCounter<>((int) (computeRangeInMillis / computeIntervalInMillis));

        //Generate scheduled tasks that perform calculations and persist counts for every computeIntervalInMillis
        scheduledExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //Synchronize the counters, so stop advancing the interval while creating checkpoints
                synchronized (counter) {
                    try {
                        advanceOneInterval();
                    } catch (Exception ex) {
                        LOG.warn("Error advancing sliding window one interval (" + computeIntervalInMillis
                                + "ms). Skipping this interval.", ex);
                    }
                }
            }
        },
                TimeUnit.SECONDS.toMillis(config.getInitialWindowAdvanceDelayInSeconds()),
                computeIntervalInMillis,
                TimeUnit.MILLISECONDS);
    }

    /**
     *Advances the internal sliding window counter by 1 at intervals, activates count persistence if the window is full
     */
    protected void advanceOneInterval() {
        Map<T, Long> counts = null;
        synchronized (counter) {
            //Persistence the count only when holding data for the entire range. No need to count each part at the start of the process.
            if (shouldPersistCounts()) {
                counts = counter.getCounts();
                counter.pruneEmptyObjects();
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("We have not collected enough interval samples to calculate across the "
                            + "entire range from shard %s. Skipping this interval.", kinesisShardId));
                }
            }
            //Advance the window "one needle"
            counter.advanceWindow();
        }
        //Data persistence if you have the entire range
        if (counts != null) {
            persister.persist(counts);
        }
    }

    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        for (Record r : records) {
            // Deserialize each record as an UTF-8 encoded JSON String of the type provided
            //UTF each record for the passed type-Deserialize to JSON string of 8
            T pair;
            try {
                pair = JSON.readValue(r.getData().array(), recordType);
            } catch (IOException e) {
                LOG.warn("Skipping record. Unable to parse record into HttpReferrerPair. Partition Key: "
                        + r.getPartitionKey() + ". Sequence Number: " + r.getSequenceNumber(),
                        e);
                continue;
            }
            //Increment the counter for a new pair. Synchronous because there are other threads that read from the counter and calculate the total at each interval.
            synchronized (counter) {
                counter.increment(pair);
            }
        }

        //Checkpoint at that time
        if (checkpointTimer.isTimeUp()) {
            //Lock to prevent additional calculations from being performed during checkpoints
            synchronized (counter) {
                checkpoint(checkpointer);
                resetCheckpointAlarm();
            }
        }
    }

    /**
     *Sufficient sample data should be collected in all ranges of windows before any count is persisted.
     *
     * @return {@code true}Do you collect all counts for all ranges and collect enough data for persistence?
     */
    private boolean shouldPersistCounts() {
        return counter.isWindowFull();
    }

    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info("Shutting down record processor for shard: " + kinesisShardId);

        scheduledExecutor.shutdown();
        try {
            //Wait up to 30 seconds for the executor service task to complete
            if (!scheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                LOG.warn("Failed to properly shut down interval thread pool for calculating interval counts and persisting them. Some counts may not have been persisted.");
            } else {
                //Checkpoint only when the thread pool is successfully shut down
                //It's important to checkpoint after reaching the end of the shard, which allows the child shard to start processing the data
                if (reason == ShutdownReason.TERMINATE) {
                    synchronized (counter) {
                        checkpoint(checkpointer);
                    }
                }
            }
        } catch (InterruptedException ie) {
            //Do not checkpoint if clean shutdown fails
            scheduledExecutor.shutdownNow();
            //Treat this error like a host or process crash or a JVM Abort
            LOG.fatal("Couldn't successfully persist data within the max wait time. Aborting the JVM to mimic a crash.");
            System.exit(1);
        }
    }

    /**
     *Set timer for next checkpoint
     */
    private void resetCheckpointAlarm() {
        checkpointTimer.alarmIn(config.getCheckpointIntervalInSeconds(), TimeUnit.SECONDS);
    }

    /**
     *Checkpoint with retry
     *
     * @param checkpointer
     */
    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
        LOG.info("Checkpointing shard " + kinesisShardId);
        for (int i = 0; i < config.getCheckpointRetries(); i++) {
            try {
                //First checkpoint the persister to ensure that all calculated counts have been persisted
                persister.checkpoint();
                checkpointer.checkpoint();
                return;
            } catch (ShutdownException se) {
                //Ignore checkpoints if processor instance shuts down (failover)
                LOG.info("Caught shutdown exception, skipping checkpoint.", se);
                return;
            } catch (ThrottlingException e) {
                //Transient failure)In case of, go back and try again
                if (i >= (config.getCheckpointRetries() - 1)) {
                    LOG.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
                    break;
                } else {
                    LOG.info("Transient issue when checkpointing - attempt " + (i + 1) + " of "
                            + config.getCheckpointRetries(),
                            e);
                }
            } catch (InvalidStateException e) {
                //Show problems with DynamoDB (check tables and IOPS)
                LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
                break;
            } catch (InterruptedException e) {
                LOG.error("Error encountered while checkpointing count persister.", e);
                //Failed during retry
            }
            try {
                Thread.sleep(config.getCheckpointBackoffTimeInSeconds());
            } catch (InterruptedException e) {
                LOG.debug("Interrupted sleep", e);
            }
        }
        //Treat this error like a host or process crash or a JVM Abort
        LOG.fatal("Couldn't successfully persist data within max retry limit. Aborting the JVM to mimic a crash.");
        System.exit(1);
    }
}

Advanced (advanced) It's a little difficult for Java beginners to start, but [Implement IRecordProcessor method](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis- record-processor-implementation-app-java.html # kinesis-record-processor-implementation-interface-java) is done in this file (please forgive the comment Japanese translation is difficult).

--ʻInitialize () : Initialize, identify shard to process --processRecords (): Process records -- shutdown () `: End of processing

Implements the three methods of to specify a series of processes, and processRecord is passed the checkpoint () method to track the records that have already been processed. These control which part of which shard you are working with.

Actually, while sliding the window for 2 seconds, it seems that it counts and saves the top 3 viewers (I still can not fully understand this part, future work.)

DynamoDB KCL creates a table in DynamoDB to maintain application state information (checkpoint and shard-worker correspondence). When you actually run the application, you can see that the DynamoDB table has been created. One is for storing the count result, and the other is the table for this state management (KinesisDataVisSampleApp-KCLDynamoDBTable- [randomString]). If you look inside, you will find various keys such as leaseKey and checkpoint. We use these to manage the state of which sentence is being read. There is Description of each key in this document.

For the time being, I will skip the understanding of the contents, but it will be necessary to change the shard according to the reading speed.


I've studied so far, but [Tutorial: Getting Started with Amazon Kinesis Data Streams Using AWS CLI](https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-tutorial- cli.html) looks like the next tutorial, so I'll try to touch it via CLI.

Recommended Posts

Kinesis Data Streams from zero Java experience (1)
Kinesis Data Streams from zero Java experience (3.1)
Kinesis Data Streams from zero Java experience (3.2)
Data processing using stream API from Java 8
Use PostgreSQL data type (jsonb) from Java
About CLDR locale data enabled by default from Java 9
Call Java from JRuby
Changes from Java 8 to Java 11
Sum from Java_1 to 100
Java Silver passing experience
Eval Java source from Java
[Java] Data type ①-Basic type
Access API.AI from Java
From Java to Ruby !!
[Java] Main data types
Java basic data types
Java to fly data from Android to ROS of Jetson Nano
Get weather forecasts from Watson Weather Company Data in simple Java
CData Software hands-on (getting kintone data from Java console application)
CData Software Hands-on (Get Twitter data from Java console application)