Comment stocker des données CSV dans Amazon Kinesis Streams avec une entrée standard

Cet article décrit un script python qui remplit Kinesis Streams avec une entrée standard d'un hôte local ou EC2.

environnement

・ Amazon Linux ・ Langage: python 2.7, shell -Format des données d'entrée: CSV

Le script python à stocker dans Kinesis Streams est décrit ci-dessous. Dans ce script, il est agrégé et stocké à 500 enregistrements / seconde *. .. (Veuillez noter que le sens de l'agrégation ne signifie pas que KPL est utilisé pour agréger un enregistrement.)

script

buffer_insert.py


import sys
import json
import random
import boto3
import time

def create_json(buffered_data, streamname):
    jdat = {}
    dat = []
    jdat["StreamName"] = streamname      
    for rec in buffered_data :
        dat.append({"Data" : rec, "PartitionKey" : str(random.randint(1,1000))})
    jdat["Records"] = dat
    return jdat

if __name__ == '__main__':

  args = sys.argv
  streamname=args[1]

  cnt = 0
  buf = []

  client = boto3.client('kinesis')
  while 1:
      if len(buf) == 500:
          ret = client.put_records(**create_json(buf,streamname ))
          time.sleep(1)
          print ret
          buf = []
      line = sys.stdin.readline()
      if not line:
          break
      buf.append(line[:-1])

Puisque les informations critiques de la clé d'accès et de la clé secrète ne sont pas définies dans le script ci-dessus, veuillez les définir dans client () si nécessaire. Voir le document boto3 ci-dessous pour plus de détails. http://boto3.readthedocs.io/en/latest/guide/configuration.html

L'exécution est la suivante. -Créé par kinesis_streams_test of Streams ・ Entrez les données test.csv

>Courir
cat test.csv | python buffer_insert.py kinesis_streams_test

>résultat
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265378143459129484557577879554'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888401204447079899259962654738'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265379352384949099186752585730'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265380561310768713815927291906'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265381770236588328445101998082'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265382979162407943074276704258'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888402413372899513889137360914'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888403622298719128518312067090'}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'e8d1fc47-17eb-1f03-b9e4-17c6595f9a22'}}

Vous trouverez ci-dessous un script pour vérifier s'il peut être stocké dans Kinesis Streams. (Portons-le sur le côté et à partir de l'AWS CLI au lieu de python ... vous pouvez bien sûr l'obtenir à partir de python.)

get_record.sh


#!/bin/bash

stream_name=$1
shard_array_index=${2:-0}
shard_iterator_type=${3:-TRIM_HORIZON}


shard_id=$(aws kinesis describe-stream --stream-name ${stream_name} --query "StreamDescription.Shards[${shard_array_index}].ShardId" --output text)
echo $shard_id
shard_iterator=$(aws kinesis get-shard-iterator --stream-name ${stream_name} --shard-id ${shard_id} --shard-iterator-type ${shard_iterator_type} --query "ShardIterator" --output text)
echo $shard_iterator
aws kinesis get-records --shard-iterator ${shard_iterator}

Puisque les données put_recorded sont codées en Base64, il est nécessaire d'effectuer un traitement de décodage côté consommateur. L'AWS CLI ne prend pas en charge base64, vous devez donc utiliser un décodeur Base64 (tel que https://www.base64decode.org/).

La prochaine fois, je présenterai un script qui agrège les données à l'aide de KPL et les stocke dans Kinesis Streams.

Recommended Posts

Comment stocker des données CSV dans Amazon Kinesis Streams avec une entrée standard
Comment créer des exemples de données CSV avec hypothèse
Entrez les données Zaim dans Amazon Elasticsearch Service avec Logstash
[Django] Comment donner des valeurs d'entrée à l'avance avec ModelForm
Comment gérer les données déséquilibrées
Comment augmenter les données avec PyTorch
[Python] Comment stocker un fichier csv en tant que données de tableau à une dimension
Entrée / sortie de données en Python (CSV, JSON)
Comment convertir csv en tsv dans la CLI
Comment utiliser BigQuery en Python
Comment lire des fichiers CSV avec Pandas
Comment lire les données de problème avec Paiza
Comment gérer les fuites de mémoire dans matplotlib.pyplot
[REAPER] Comment jouer à Reascript avec Python
Résumé de la lecture des données numériques avec python [CSV, NetCDF, Fortran binary]
Comment gérer les erreurs d'exécution dans subprocess.call
Comment récupérer des données de courses de chevaux avec Beautiful Soup
Comment créer des données à mettre dans CNN (Chainer)
Comment utiliser tkinter avec python dans pyenv
Comment lire les données de séries chronologiques dans PyTorch
Écrire des données CSV sur AWS-S3 avec AWS-Lambda + Python
Comment générer "Ketsumaimo" en standard en Python
Comment masquer l'entrée utilisateur dans l'interface graphique PySimple
Comment utiliser fixture dans Django pour saisir des exemples de données associés au modèle utilisateur
Comment améliorer la surveillance des métriques de modèle avec Amazon SageMaker
Comment convertir / restaurer une chaîne avec [] en python
Comment utiliser xgboost: classification multi-classes avec des données d'iris
Comment appliquer des marqueurs uniquement à des données spécifiques avec matplotlib
Comment récupérer des données d'image de Flickr avec Python
Comment faire un calcul de hachage avec Salt en Python
Comment convertir des données détenues horizontalement en données détenues verticalement avec des pandas
Comment gérer l'échec de l'initialisation pyenv dans Fish 3.1.0
Comment faire du zéro-padding sur une ligne avec OpenCV
Comment exécuter des tests avec Python unittest
Comment charger des fichiers dans Google Drive avec Google Colaboratory
Comment accéder avec cache lors de la lecture_json avec pandas
Comment obtenir plus de 1000 données avec SQLAlchemy + MySQLdb
Comment extraire des données qui ne manquent pas de valeur nan avec des pandas
Comment générer un CSV d'en-tête multiligne avec des pandas
Comment convertir un fichier JSON en fichier CSV avec Python Pandas
Comment faire un clic droit en utilisant la saisie au clavier dans RPA?
Comment lire un csv contenant uniquement des entiers en Python
Comment gérer l'exécution de la transaction: échec dans Anaconda
Comment extraire des données qui ne manquent pas de valeur nan avec des pandas
Comment lire du texte avec une entrée standard ou une spécification de nom de fichier comme cat en Python
Comment calculer la somme ou la moyenne des données csv de séries chronologiques en un instant
[Python / Ruby] Comprendre le code Comment obtenir des données en ligne et les écrire au format CSV
[TensorFlow 2 / Keras] Comment exécuter l'apprentissage avec CTC Loss dans Keras
[Go language] Comment obtenir l'entrée du terminal en temps réel
Comment déboguer une bibliothèque Python standard dans Visual Studio
Comment sortir un document au format pdf avec Sphinx
Comment extraire n'importe quel rendez-vous dans Google Agenda avec Python
Comment vérifier le comportement d'ORM avec un fichier avec django
Enregistrer en japonais dans StringProperty dans le magasin de données Google App Engine
[Pour les débutants] Résumé de l'entrée standard en Python (avec explication)
Ingéniosité pour gérer les données avec Pandas de manière à économiser la mémoire
Comment manipuler le DOM dans iframe avec Selenium
[AWS] Comment gérer l'erreur "Point de code non valide" dans CloudSearch
Pour les débutants, comment gérer les erreurs courantes dans les keras