Operate Kinesis with Python

What is a Kinesis stream?

Collect and process large amounts of data records in real time. It is possible to continuously acquire and save many tella of data per hour. Data retention period is 24 hours. It can be extended up to 7 days by charging.

Shard

A unit of stream throughput. You can increase the throughput of the stream by increasing the shard.

Data record

A unit of data stored in a stream. The record consists of a sequence number, a partition key, and a data blob. A data blob is the data itself. Base64 encoded and saved.

Kinesis Limits

--The capacity of one record is 1MB

reading

If you get caught in either one, an error will occur.

―― 2MB / s per shard ―― 5 transactions per shard

writing

If you get caught in either one, an error will occur.

―― 1MB / s per shard --1000 transactions per shard

Get / put from Kinesis in Python

Preparation

Install boto with pip. Obtain the access key and secret key on AWS. Create the following file as .boto in the same hierarchy as the program.

.boto


[Credentials]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_ACCESS_KEY

get

from boto impor kinesis

#Specify the Kinesis region to use
conn = kinesis.connect_to_region(region_name = 'Region name')

#Get the information of the specified stream
stream = conn.describe_stream('Stream name')

#Get shard information from the stream information you just got
#If there are multiple shards, return in list
shards = stream['StreamDescription']['Shards']

#Get from all shards
for shard in shards:
  shard_id = shard['ShardId']

  #Get the position to get
  #First argument: Stream name
  #Second argument: shard id
  #Third argument: Read type
  #         LATEST:Read from the very end of the shard
  #         TRIM_HORIZON:Read from the very beginning of the shard
  #         AT_SEQUENCE_NUMBER:Read from the specified sequence number. Specify the sequence number in the 4th argument
  #         AFTER_SEQUENCE_NUMBER:Read from the next of the specified sequence number. Specify the sequence number in the 4th argument
  iterator = conn.get_sahrd_iterator(stream['StreamDescription']['StreamName'], shard_id, shard_iterator_type='LATEST')['ShardIterator']

  #Get data from Kinesis
  #Acquisition upper limit can be specified with the second argument(limit)
  result = conn.get_records(iterator)
  print result

I think it will be easier to understand if you print the one that describes the stream.

put

from boto import kinesis


#Specify the Kinesis region to use
conn = kinesis.connect_to_region(region_name = 'Region name')

#put
conn.put_record('Stream name', data = 'hogehoge', partition_key = 'partition_key')

partition_key

Partition keys are used to isolate data records and route them to different shards in the stream. If you have two shards in your stream, you can use two partition keys to control which data is stored in which shard.

reference

Recommended Posts

Operate Kinesis with Python
Operate Blender with Python
Operate Excel with Python (1)
Operate Excel with Python (2)
Operate Excel with Python openpyxl
Operate TwitterBot with Lambda, Python
[Note] Operate MongoDB with Python
[Python] [SQLite3] Operate SQLite with Python (Basic)
Operate a receipt printer with python
Try to operate Facebook with Python
Operate ECHONET Lite appliances with Python
FizzBuzz with Python3
Scraping with Python
Statistics with python
Scraping with Python
Twilio with Python
Integrate with Python
Play with 2016-Python
AES256 with python
Tested with Python
python starts with ()
with syntax (Python)
Bingo with python
Zundokokiyoshi with python
Excel with Python
Microcomputer with Python
Cast with python
Operate smartlife power supply with python (de-IFTTT)
[GCP] Operate Google Cloud Storage with Python
[Pyto] Operate iPhone Taptic Engine with Python
[Python] Automatically operate the browser with Selenium
Operate home appliances with Python and IRKit
Serial communication with Python
Django 1.11 started with Python3.6
Primality test with Python
Python with eclipse + PyDev.
Socket communication with Python
Data analysis with python 2
Scraping with Python (preparation)
Try scraping with Python.
Learning Python with ChemTHEATER 03
"Object-oriented" learning with python
Operate Redmine using Python Redmine
Run Python with VBA
Handling yaml with python
Solve AtCoder 167 with python
Serial communication with python
Operate Filemaker from Python
[Python] Use JSON with Python
Learning Python with ChemTHEATER 05-1
Learn Python with ChemTHEATER
Run prepDE.py with python3
1.1 Getting Started with Python
Collecting tweets with Python
Binarization with OpenCV / Python
3. 3. AI programming with Python
Kernel Method with Python
Non-blocking with Python + uWSGI
Scraping with Python + PhantomJS
Posting tweets with python
Drive WebDriver with python