This article describes a python script that populates Kinesis Streams with standard input from localhost or EC2.
・ Amazon linux ・ Language: python 2.7, shell -Input data format: CSV
The python script to be stored in Kinesis Streams is described below. In this script, it is aggregated and stored at 500 records / second *. .. (Please note that the meaning of aggregation is not aggrigation to one record using KPL)
script
buffer_insert.py
import sys
import json
import random
import boto3
import time
def create_json(buffered_data, streamname):
jdat = {}
dat = []
jdat["StreamName"] = streamname
for rec in buffered_data :
dat.append({"Data" : rec, "PartitionKey" : str(random.randint(1,1000))})
jdat["Records"] = dat
return jdat
if __name__ == '__main__':
args = sys.argv
streamname=args[1]
cnt = 0
buf = []
client = boto3.client('kinesis')
while 1:
if len(buf) == 500:
ret = client.put_records(**create_json(buf,streamname ))
time.sleep(1)
print ret
buf = []
line = sys.stdin.readline()
if not line:
break
buf.append(line[:-1])
Since the credential information of access-key and secret-key is not set in the above script, please set it in client () if necessary. See the boto3 documentation below for details. http://boto3.readthedocs.io/en/latest/guide/configuration.html
The execution is as follows. -Created by Kinesis_streams_test of Streams ・ Enter the data test.csv
>Run
cat test.csv | python buffer_insert.py kinesis_streams_test
>result
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265378143459129484557577879554'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888401204447079899259962654738'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265379352384949099186752585730'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265380561310768713815927291906'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265381770236588328445101998082'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265382979162407943074276704258'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888402413372899513889137360914'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888403622298719128518312067090'}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'e8d1fc47-17eb-1f03-b9e4-17c6595f9a22'}}
The script to check if it can be stored in Kinesis Streams is described below. (Looking sideways, from the AWS CLI instead of python ... you can of course get it from python.)
get_record.sh
#!/bin/bash
stream_name=$1
shard_array_index=${2:-0}
shard_iterator_type=${3:-TRIM_HORIZON}
shard_id=$(aws kinesis describe-stream --stream-name ${stream_name} --query "StreamDescription.Shards[${shard_array_index}].ShardId" --output text)
echo $shard_id
shard_iterator=$(aws kinesis get-shard-iterator --stream-name ${stream_name} --shard-id ${shard_id} --shard-iterator-type ${shard_iterator_type} --query "ShardIterator" --output text)
echo $shard_iterator
aws kinesis get-records --shard-iterator ${shard_iterator}
Since the put_recorded data is Base64 encoded, it is necessary to perform decoding processing on the consumer side. The AWS CLI does not support base64, so you need to use a Base64 decoder (such as https://www.base64decode.org/).
Next time, I will introduce a script that aggregates data using KPL and stores it in Kinesis Streams.
Recommended Posts