As the title suggests, Redshift has a data warehouse, which is usually processed by ELT, but there are cases where data processing by programming is required.
By using Redshift's UNLOAD, you can create a gzip file from Redshift to S3 with the SQL result, so it is said that it will be processed by Lambda with the put event to S3 as a trigger and uploaded to S3 in the state of gzipping again. I tried to do that.
UNLOAD Lambda currently has a maximum of 3008MB. Processing like this will inevitably increase the amount of memory used as the file size increases. Therefore, adjust the file size to be passed to Lambda by setting the MAXFILESIZE parameter. It's a complete case by case, but this time I set it at 50MB.
Trigger settings are omitted.
import json
import boto3
import urllib.parse
import os
import sys
import csv
import re
import traceback
import gzip
import subprocess
s3client = boto3.client('s3')
s3resource = boto3.resource('s3')
SEP = '\t'
L_SEP = '\n'
S3OUTBACKET='XXXXXXXX'
S3OUTBASE='athena/preprocessing/XXXXXXtmp/'
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
taragetfile=os.path.split(key)[1]
outputprefixA=os.path.split(key)[0].split("/")[-1]
outputprefixB=os.path.split(key)[0].split("/")[-2]
outputdata = "";
try:
dlfilename ='/tmp/'+key.replace("/","")
s3client.download_file(bucket, key, dlfilename)
gzipfile = gzip.open(dlfilename, 'rt')
csvreader = csv.reader(gzipfile, delimiter=SEP, lineterminator=L_SEP, quoting=csv.QUOTE_NONE)
for line in csvreader:
#Various processing is performed line by line and stored in output data.
#Among the omitted processes, there is an import that we are using.
#Please note
except Exception as e:
print(e)
raise e
print("memory size at outputdata:"+str(sys.getsizeof(outputdata)))
os.remove(dlfilename)
uploadbinary = gzip.compress(bytes(outputdata , 'utf-8'))
print("memory size at uploadbinary:"+str(sys.getsizeof(uploadbinary)))
uploadfilename='processed_'+taragetfile
try:
bucket = S3OUTBACKET
key = S3OUTBASE+outputprefixA+"/"+outputprefixB+"/"+uploadfilename
obj = s3resource.Object(bucket,key)
obj.put( Body=uploadbinary )
except Exception as e:
print(e)
raise e
return 0
When I tested it with an actual file, I got a memory error.
The str (sys.getsizeof (outputdata))
in the middle of the code is for confirmation, and I grasped the situation by looking at the memory size. Although it is not written in the code, it is good to see the compression ratio of gzip itself to the target data.
The data I handled this time was 50MB after gzip compression, but the processed data + compressed data required 1000MB of memory. After all it is something that you can not understand unless you actually try it. You may want to investigate Python's memory situation a little more.
If you increase the memory size of Lambda, CPU resources etc. will also increase, so it depends on the processing content and file size, but it is good to check how fast the processing will be once the maximum is 3008MB. Again, there were cases where doubling the memory would halve the processing time.
If the process is performed on a regular basis, tuning here is directly linked to running costs, so it is very important.
Lambda very convenient
Recommended Posts