I said, "Let's spit out the application log, analysis data, or S3! Pass? You can think about it later, so you can cut it with yyyy-mm-dd
for the time being! "
~ 1 year later ~
I said, "Why is the data stored in a path format that is difficult to analyze?"
It was a story of trying to do something because it was in such a state.
As in the above example, if you have output to S3 with the following Key without considering the operation in particular
s3://BUCKET_NAME/path/to/2020-01-01/log.json
When I want to analyze it, when I use Athena etc. to query the file here, I get into a situation where the appropriate partition cannot be applied to the date.
What does that mean?
"Let's analyze the data of January 2019 across the board!"
Even so, in S3, the character string like 2019-01-01
is just the Key, and it is extremely difficult to make a query like2019-01- *
.
Therefore, we will consider converting the storage method to S3 to Hive format. The Hive format is as follows.
s3://BUCKET_NAME/path/to/year=2020/month=01/date=01/log.json
If you store the Object with such a Key and partition it for yyyy / mm / dd on the Athena table, you can execute the query by dividing it into specific dates in the SQL Where clause. It will be easier to analyze.
However, since S3 is a storage format that stores Objects in Key-Value format, it is not possible to change the Key for Object to write at once. Therefore, I created a script to change the Key to Hive format at once for the Object of the specified period, and executed it from Lambda.
Immediately, the created Lambda is a simple 1-file script as shown below.
import os
import boto3
from datetime import datetime, timedelta
# Load Environment Variables
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
S3_BEFORE_KEY = os.environ['S3_BEFORE_KEY']
S3_AFTER_KEY = os.environ['S3_AFTER_KEY']
S3_BEFORE_FORMAT = os.environ['S3_BEFORE_FORMAT']
FROM_DATE = os.environ['FROM_DATE']
TO_DATE = os.environ['TO_DATE']
DELETE_FRAG = os.environ['DELETE_FRAG']
def date_range(from_date: datetime, to_date: datetime):
"""
Create Generator Range of Date
Args:
from_date (datetime) : datetime param of start date
to_date (datetime) : datetime param of end date
Returns:
Generator
"""
diff = (to_date - from_date).days + 1
return (from_date + timedelta(i) for i in range(diff))
def pre_format_key():
"""
Reformat S3 Key Parameter given
Args:
None
Returns:
None
"""
global S3_BEFORE_KEY
global S3_AFTER_KEY
if S3_BEFORE_KEY[-1] == '/':
S3_BEFORE_KEY = S3_BEFORE_KEY[:-1]
if S3_AFTER_KEY[-1] == '/':
S3_AFTER_KEY = S3_AFTER_KEY[:-1]
def change_s3_key(date: datetime):
"""
Change S3 key from datetime format to Hive format at specific date
Args:
date (datetime) : target date to change key
Returns:
None
"""
before_date_str = datetime.strftime(date, S3_BEFORE_FORMAT)
print('Change following date key format : {}'.format(before_date_str))
before_path = f'{S3_BEFORE_KEY}/{before_date_str}/'
after_path = "{}/year={}/month={}/date={}".format(
S3_AFTER_KEY, date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')
)
s3 = boto3.client('s3')
response = s3.list_objects_v2(
Bucket=S3_BUCKET_NAME,
Delimiter="/",
Prefix=before_path
)
try:
for content in response["Contents"]:
key = content['Key']
file_name = key.split('/')[-1]
after_key = f'{after_path}/{file_name}'
s3.copy_object(
Bucket=S3_BUCKET_NAME,
CopySource={'Bucket': S3_BUCKET_NAME, 'Key': key},
Key=after_key
)
if DELETE_FRAG == 'True':
s3.delete_object(Bucket=S3_BUCKET_NAME, Key=key)
except Exception as e:
print(e)
return
def lambda_handler(event, context):
pre_format_key()
from_date = datetime.strptime(FROM_DATE, "%Y%m%d")
to_date = datetime.strptime(TO_DATE, "%Y%m%d")
for date in date_range(from_date, to_date):
change_s3_key(date)
At the time of execution, it is necessary to enter the following settings in Lambda.
--Set the following in the environment variables
Environment variable | value | Remarks |
---|---|---|
S3_BUCKET_NAME | S3 bucket name | |
S3_BEFORE_KEY | Before change S3 key (path)/to) | |
S3_AFTER_KEY | Before change S3 key (path)/to) | Same value as above if key movement is not required |
S3_BEFORE_FORMAT | Date format before change | %Y-%m-%d Formats that Python datetime can recognize |
FROM_DATE | start date(yyyymmdd) | The starting point of the Object for which you want to change the key |
TO_DATE | End date(yyyymmdd) | End point of Object for which you want to change the key |
DELETE_FRAG | True/False | Whether to delete the original Object |
--Grant Lambda execution Role the operation authority of the target bucket of S3 --Adjust execution time and allocated memory as appropriate
The necessary settings have been made into environment variables, so I hope you can set them as you like according to your own environment. Also, error handling is cumbersome and not implemented. Since it is a script that is executed only once by SPOT, it is kept to a minimum implementation. Please correct if you are interested.
I changed the normal date format to Hive format for the existing S3 Key, and I was able to make it a format that is easy to analyze safely.
As an additional information, if you execute Glue Cralwer in the path / to / layer, a Data Catalog that can be read by Athena will be automatically generated including Partition, so the analysis life in Athena will be enriched.
Please let me know if the implementation here is strange or if you want to do more like this! It's not a big deal, but I'll keep the repository public. https://github.com/kzk-maeda/change-s3-key/blob/master/lambda_function.py
Recommended Posts