How to store CSV data in Amazon Kinesis Streams with standard input

This article describes a python script that populates Kinesis Streams with standard input from localhost or EC2.

environment

・ Amazon linux ・ Language: python 2.7, shell -Input data format: CSV

The python script to be stored in Kinesis Streams is described below. In this script, it is aggregated and stored at 500 records / second *. .. (Please note that the meaning of aggregation is not aggrigation to one record using KPL)

script

buffer_insert.py


import sys
import json
import random
import boto3
import time

def create_json(buffered_data, streamname):
    jdat = {}
    dat = []
    jdat["StreamName"] = streamname      
    for rec in buffered_data :
        dat.append({"Data" : rec, "PartitionKey" : str(random.randint(1,1000))})
    jdat["Records"] = dat
    return jdat

if __name__ == '__main__':

  args = sys.argv
  streamname=args[1]

  cnt = 0
  buf = []

  client = boto3.client('kinesis')
  while 1:
      if len(buf) == 500:
          ret = client.put_records(**create_json(buf,streamname ))
          time.sleep(1)
          print ret
          buf = []
      line = sys.stdin.readline()
      if not line:
          break
      buf.append(line[:-1])

Since the credential information of access-key and secret-key is not set in the above script, please set it in client () if necessary. See the boto3 documentation below for details. http://boto3.readthedocs.io/en/latest/guide/configuration.html

The execution is as follows. -Created by Kinesis_streams_test of Streams ・ Enter the data test.csv

>Run
cat test.csv | python buffer_insert.py kinesis_streams_test

>result
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265378143459129484557577879554'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888401204447079899259962654738'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265379352384949099186752585730'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265380561310768713815927291906'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265381770236588328445101998082'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265382979162407943074276704258'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888402413372899513889137360914'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888403622298719128518312067090'}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'e8d1fc47-17eb-1f03-b9e4-17c6595f9a22'}}

The script to check if it can be stored in Kinesis Streams is described below. (Looking sideways, from the AWS CLI instead of python ... you can of course get it from python.)

get_record.sh


#!/bin/bash

stream_name=$1
shard_array_index=${2:-0}
shard_iterator_type=${3:-TRIM_HORIZON}


shard_id=$(aws kinesis describe-stream --stream-name ${stream_name} --query "StreamDescription.Shards[${shard_array_index}].ShardId" --output text)
echo $shard_id
shard_iterator=$(aws kinesis get-shard-iterator --stream-name ${stream_name} --shard-id ${shard_id} --shard-iterator-type ${shard_iterator_type} --query "ShardIterator" --output text)
echo $shard_iterator
aws kinesis get-records --shard-iterator ${shard_iterator}

Since the put_recorded data is Base64 encoded, it is necessary to perform decoding processing on the consumer side. The AWS CLI does not support base64, so you need to use a Base64 decoder (such as https://www.base64decode.org/).

Next time, I will introduce a script that aggregates data using KPL and stores it in Kinesis Streams.

Recommended Posts

How to store CSV data in Amazon Kinesis Streams with standard input
How to create sample CSV data with hypothesis
Input Zaim data to Amazon Elasticsearch Service with Logstash
[Django] How to give input values in advance with ModelForm
How to deal with imbalanced data
How to Data Augmentation with PyTorch
[Python] How to store a csv file as one-dimensional array data
Data input / output in Python (CSV, JSON)
How to convert csv to tsv in CLI
How to work with BigQuery in Python
How to read CSV files in Pandas
How to read problem data with paiza
How to deal with memory leaks in matplotlib.pyplot
[REAPER] How to play with Reascript in Python
Summary of how to read numerical data with python [CSV, NetCDF, Fortran binary]
How to deal with run-time errors in subprocess.call
How to scrape horse racing data with BeautifulSoup
How to create data to put in CNN (Chainer)
How to use tkinter with python in pyenv
How to read time series data in PyTorch
Write CSV data to AWS-S3 with AWS-Lambda + Python
How to output "Ketsumaimo" as standard output in Python
How to hide user input in PySimple GUI
How to use fixture in Django to populate sample data associated with a user model
How to improve model metric monitoring in Amazon SageMaker
How to convert / restore a string with [] in python
How to use xgboost: Multi-class classification with iris data
How to apply markers only to specific data in matplotlib
How to scrape image data from flickr with python
How to do hash calculation with salt in Python
How to convert horizontally held data to vertically held data with pandas
How to deal with pyenv initialization failure in fish 3.1.0
How to do zero-padding in one line with OpenCV
How to run tests in bulk with Python unittest
How to load files in Google Drive with Google Colaboratory
How to access with cache when reading_json in pandas
How to get more than 1000 data with SQLAlchemy + MySQLdb
How to extract non-missing value nan data with pandas
How to output CSV of multi-line header with pandas
How to convert JSON file to CSV file with Python Pandas
How to right click using keyboard input in RPA?
How to read csv containing only integers in Python
How to deal with Executing transaction: failed in Anaconda
How to extract non-missing value nan data with pandas
How to read text by standard input or file name specification like cat in Python
How to calculate the sum or average of time series csv data in an instant
[Python / Ruby] Understanding with code How to get data from online and write it to CSV
How to embed multiple embeds in one message with Discord.py
[TensorFlow 2 / Keras] How to run learning with CTC Loss in Keras
[Go language] How to get terminal input in real time
How to debug the Python standard library in Visual Studio
How to output a document in pdf format with Sphinx
How to extract any appointment in Google Calendar with Python
How to check ORM behavior in one file with django
Save in Japanese to StringProperty in Google App Engine data store
[For beginners] Summary of standard input in Python (with explanation)
Ingenuity to handle data with Pandas in a memory-saving manner
How to generate exponential pulse time series data in python
How to manipulate the DOM in an iframe with Selenium
[AWS] How to deal with "Invalid codepoint" error in CloudSearch
For beginners, how to deal with common errors in keras