Comme le titre l'indique, Redshift dispose d'un entrepôt de données, qui est généralement traité par ELT, mais il y a des cas où le traitement des données par programmation est nécessaire.
En utilisant UNLOAD de Redshift, vous pouvez créer un fichier gzip de Redshift vers S3 avec le résultat de SQL, il est donc dit qu'il sera traité par Lambda avec l'événement put sur S3 comme déclencheur et téléchargé à nouveau sur S3 dans l'état de gzip. J'ai essayé de faire ça.
UNLOAD Lambda dispose actuellement d'un maximum de 3008 Mo. Un tel traitement augmentera inévitablement la quantité de mémoire utilisée à mesure que la taille du fichier augmente. Par conséquent, ajustez la taille de fichier à transmettre à Lambda en définissant le paramètre MAXFILESIZE. C'est un cas par cas complet, mais cette fois je l'ai mis à 50 Mo.
Les paramètres de déclenchement sont omis.
import json
import boto3
import urllib.parse
import os
import sys
import csv
import re
import traceback
import gzip
import subprocess
s3client = boto3.client('s3')
s3resource = boto3.resource('s3')
SEP = '\t'
L_SEP = '\n'
S3OUTBACKET='XXXXXXXX'
S3OUTBASE='athena/preprocessing/XXXXXXtmp/'
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
taragetfile=os.path.split(key)[1]
outputprefixA=os.path.split(key)[0].split("/")[-1]
outputprefixB=os.path.split(key)[0].split("/")[-2]
outputdata = "";
try:
dlfilename ='/tmp/'+key.replace("/","")
s3client.download_file(bucket, key, dlfilename)
gzipfile = gzip.open(dlfilename, 'rt')
csvreader = csv.reader(gzipfile, delimiter=SEP, lineterminator=L_SEP, quoting=csv.QUOTE_NONE)
for line in csvreader:
#Divers traitements sont effectués ligne par ligne et stockés dans les données de sortie.
#Parmi les processus omis, il y a une importation qui est utilisée.
#Notez s'il vous plaît
except Exception as e:
print(e)
raise e
print("memory size at outputdata:"+str(sys.getsizeof(outputdata)))
os.remove(dlfilename)
uploadbinary = gzip.compress(bytes(outputdata , 'utf-8'))
print("memory size at uploadbinary:"+str(sys.getsizeof(uploadbinary)))
uploadfilename='processed_'+taragetfile
try:
bucket = S3OUTBACKET
key = S3OUTBASE+outputprefixA+"/"+outputprefixB+"/"+uploadfilename
obj = s3resource.Object(bucket,key)
obj.put( Body=uploadbinary )
except Exception as e:
print(e)
raise e
return 0
Lorsque je l'ai testé avec un fichier réel, j'ai eu une erreur de mémoire.
Le str (sys.getsizeof (outputdata))
au milieu du code est pour confirmation, et j'ai saisi la situation en regardant la taille de la mémoire. Bien qu'il ne soit pas écrit dans le code, il est bon de voir le taux de compression de gzip lui-même par rapport aux données cibles.
Les données que j'ai traitées cette fois-ci étaient de 50 Mo après la compression gzip, mais cela a pris 1000 Mo de mémoire pour les données traitées + les données compressées. Après tout, c'est quelque chose que vous ne pouvez pas comprendre à moins de l'essayer. Il peut être préférable d'étudier un peu plus la situation de la mémoire de Python.
Si vous augmentez la taille de la mémoire de Lambda, les ressources du processeur augmenteront également, donc cela dépend du contenu du traitement et de la taille du fichier, mais il est bon de vérifier à quelle vitesse le traitement sera une fois que le maximum est de 3008 Mo. Encore une fois, il y avait des cas où doubler la mémoire réduirait de moitié le temps de traitement.
Si le processus est effectué régulièrement, le réglage est ici très important car il est directement lié au coût de fonctionnement.
Lambda très pratique
Recommended Posts