J'ai dit: "Crachons le journal de l'application, les données d'analyse ou S3 pour le moment! Passé? Vous pourrez y réfléchir plus tard, afin de pouvoir le couper avec aaaa-mm-jj
pour le moment!"
~ 1 an plus tard ~
J'ai dit: "Pourquoi les données sont-elles stockées dans un format de chemin difficile à analyser?"
C'était une histoire d'essayer de faire quelque chose parce que c'était dans un tel état.
Comme dans l'exemple ci-dessus, si vous avez une sortie vers S3 avec la clé suivante sans tenir compte de l'opération en particulier
s3://BUCKET_NAME/path/to/2020-01-01/log.json
Lorsque je veux l'analyser, lorsque j'utilise Athena, etc. pour interroger le fichier ici, je me retrouve dans une situation où la partition appropriée ne peut pas être appliquée à la date.
Qu'est-ce que ça veut dire? "Analysons les données de janvier 2019 à tous les niveaux!" Même ainsi, dans S3, il est extrêmement difficile de faire une requête comme «2019-01- *», simplement parce que la chaîne de caractères comme «2019-01-01» est la clé.
Par conséquent, nous envisagerons de convertir la méthode de stockage au format S3 au format Hive. Le format Hive est le suivant.
s3://BUCKET_NAME/path/to/year=2020/month=01/date=01/log.json
Si vous enregistrez l'objet avec une telle clé et coupez la partition pour aaaa / mm / jj sur la table Athena, vous pouvez exécuter la requête en la divisant en dates spécifiques dans la clause Where de SQL. Ce sera plus facile à analyser.
Cependant, étant donné que S3 est un format de stockage qui stocke les objets au format clé-valeur, il n'est pas possible de modifier la clé d'objet à écrire en même temps. Par conséquent, j'ai créé un script pour changer le format Clé en Hive à la fois pour l'objet de la période spécifiée et l'ai exécuté à partir de Lambda.
Immédiatement, le Lambda créé est un simple script à 1 fichier, comme illustré ci-dessous.
import os
import boto3
from datetime import datetime, timedelta
# Load Environment Variables
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
S3_BEFORE_KEY = os.environ['S3_BEFORE_KEY']
S3_AFTER_KEY = os.environ['S3_AFTER_KEY']
S3_BEFORE_FORMAT = os.environ['S3_BEFORE_FORMAT']
FROM_DATE = os.environ['FROM_DATE']
TO_DATE = os.environ['TO_DATE']
DELETE_FRAG = os.environ['DELETE_FRAG']
def date_range(from_date: datetime, to_date: datetime):
"""
Create Generator Range of Date
Args:
from_date (datetime) : datetime param of start date
to_date (datetime) : datetime param of end date
Returns:
Generator
"""
diff = (to_date - from_date).days + 1
return (from_date + timedelta(i) for i in range(diff))
def pre_format_key():
"""
Reformat S3 Key Parameter given
Args:
None
Returns:
None
"""
global S3_BEFORE_KEY
global S3_AFTER_KEY
if S3_BEFORE_KEY[-1] == '/':
S3_BEFORE_KEY = S3_BEFORE_KEY[:-1]
if S3_AFTER_KEY[-1] == '/':
S3_AFTER_KEY = S3_AFTER_KEY[:-1]
def change_s3_key(date: datetime):
"""
Change S3 key from datetime format to Hive format at specific date
Args:
date (datetime) : target date to change key
Returns:
None
"""
before_date_str = datetime.strftime(date, S3_BEFORE_FORMAT)
print('Change following date key format : {}'.format(before_date_str))
before_path = f'{S3_BEFORE_KEY}/{before_date_str}/'
after_path = "{}/year={}/month={}/date={}".format(
S3_AFTER_KEY, date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')
)
s3 = boto3.client('s3')
response = s3.list_objects_v2(
Bucket=S3_BUCKET_NAME,
Delimiter="/",
Prefix=before_path
)
try:
for content in response["Contents"]:
key = content['Key']
file_name = key.split('/')[-1]
after_key = f'{after_path}/{file_name}'
s3.copy_object(
Bucket=S3_BUCKET_NAME,
CopySource={'Bucket': S3_BUCKET_NAME, 'Key': key},
Key=after_key
)
if DELETE_FRAG == 'True':
s3.delete_object(Bucket=S3_BUCKET_NAME, Key=key)
except Exception as e:
print(e)
return
def lambda_handler(event, context):
pre_format_key()
from_date = datetime.strptime(FROM_DATE, "%Y%m%d")
to_date = datetime.strptime(TO_DATE, "%Y%m%d")
for date in date_range(from_date, to_date):
change_s3_key(date)
Au moment de l'exécution, il est nécessaire de saisir les paramètres suivants dans Lambda.
--Définissez les éléments suivants dans les variables d'environnement
Variable d'environnement | valeur | Remarques |
---|---|---|
S3_BUCKET_NAME | Nom du compartiment S3 | |
S3_BEFORE_KEY | Avant de changer la clé S3 (chemin)/to) | |
S3_AFTER_KEY | Avant de changer la clé S3 (chemin)/to) | Même valeur que ci-dessus si le mouvement des touches n'est pas nécessaire |
S3_BEFORE_FORMAT | Format de date avant le changement | %Y-%m-%d Formats que Python datetime peut reconnaître |
FROM_DATE | date de début(yyyymmdd) | Le point de départ de l'objet dont vous souhaitez modifier la clé |
TO_DATE | Date de fin(yyyymmdd) | Point final de l'objet dont vous souhaitez modifier la clé |
DELETE_FRAG | True/False | S'il faut supprimer l'objet d'origine |
--Grant Lambda execution Rôle l'autorité d'opération du compartiment cible de S3
Les paramètres nécessaires ont été convertis en variables d'environnement, veuillez donc les définir comme vous le souhaitez en fonction de votre propre environnement. En outre, la gestion des erreurs est lourde et non mise en œuvre. Puisqu'il s'agit d'un script qui n'est exécuté qu'une seule fois par SPOT, il est réduit au minimum. Veuillez corriger si vous êtes intéressé.
Nous avons changé la clé S3 existante du format de date normal au format Hive et avons pu faciliter l'analyse en toute sécurité.
Comme information supplémentaire, si vous exécutez Glue Cralwer dans le chemin / vers / couche, un catalogue de données qui peut être lu par Athena est automatiquement généré, y compris la partition, de sorte que la vie d'analyse dans Athena sera enrichie.
S'il vous plaît laissez-moi savoir si l'implémentation ici est étrange ou si vous voulez faire plus comme ça! Ce n'est pas un gros problème, mais je vais garder le référentiel public. https://github.com/kzk-maeda/change-s3-key/blob/master/lambda_function.py
Recommended Posts