Collect and process large amounts of data records in real time. It is possible to continuously acquire and save many tella of data per hour. Data retention period is 24 hours. It can be extended up to 7 days by charging.
A unit of stream throughput. You can increase the throughput of the stream by increasing the shard.
A unit of data stored in a stream. The record consists of a sequence number, a partition key, and a data blob. A data blob is the data itself. Base64 encoded and saved.
--The capacity of one record is 1MB
If you get caught in either one, an error will occur.
―― 2MB / s per shard ―― 5 transactions per shard
If you get caught in either one, an error will occur.
―― 1MB / s per shard --1000 transactions per shard
Install boto with pip.
Obtain the access key and secret key on AWS.
Create the following file as .boto
in the same hierarchy as the program.
.boto
[Credentials]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_ACCESS_KEY
from boto impor kinesis
#Specify the Kinesis region to use
conn = kinesis.connect_to_region(region_name = 'Region name')
#Get the information of the specified stream
stream = conn.describe_stream('Stream name')
#Get shard information from the stream information you just got
#If there are multiple shards, return in list
shards = stream['StreamDescription']['Shards']
#Get from all shards
for shard in shards:
shard_id = shard['ShardId']
#Get the position to get
#First argument: Stream name
#Second argument: shard id
#Third argument: Read type
# LATEST:Read from the very end of the shard
# TRIM_HORIZON:Read from the very beginning of the shard
# AT_SEQUENCE_NUMBER:Read from the specified sequence number. Specify the sequence number in the 4th argument
# AFTER_SEQUENCE_NUMBER:Read from the next of the specified sequence number. Specify the sequence number in the 4th argument
iterator = conn.get_sahrd_iterator(stream['StreamDescription']['StreamName'], shard_id, shard_iterator_type='LATEST')['ShardIterator']
#Get data from Kinesis
#Acquisition upper limit can be specified with the second argument(limit)
result = conn.get_records(iterator)
print result
I think it will be easier to understand if you print the one that describes the stream.
from boto import kinesis
#Specify the Kinesis region to use
conn = kinesis.connect_to_region(region_name = 'Region name')
#put
conn.put_record('Stream name', data = 'hogehoge', partition_key = 'partition_key')
partition_key
Partition keys are used to isolate data records and route them to different shards in the stream. If you have two shards in your stream, you can use two partition keys to control which data is stored in which shard.
Recommended Posts