Periodically export the logs saved in CloudWatch Logs to S3 created in [AWS] CloudFormation to create S3 buckets and set lifecycle rules It is a story of developing Lambda with Python.
I have created a GitHub repository for easy trial-> homoluctus / lambda-cwlogs-s3
--Export the previous day's log to JST 14:00 every day --Multiple log groups can be exported
Since it is impossible to put all the code, only the main code is put for explanation. See Repository for the complete source code.
Enabled to set multiple log groups to be exported like TypeScript Interface. If you want to add a log group to export, just inherit the LogGroup class.
I'm trying to make it easier to see which log group's log the S3 object key to export is when.
As I wrote in docstring, it has a hierarchical structure such as dest_bucket / dest_obj_first_prefix or log_group / dest_obj_final_prefix / *
. If dest_obj_first_prefix is not specified, the name of log_group will be entered. After *
, it looks like export task ID / log stream / file
. This is automatically added and cannot be controlled.
class LogGroup(object, metaclass=ABCMeta):
"""Configuration base class for exporting CloudWatch Logs to S3
How to add a log group to export
class Example(LogGroup):
log_group = 'test'
"""
# log_group is required, otherwise optional
log_group: ClassVar[str]
log_stream: ClassVar[str] = ''
start_time: ClassVar[int] = get_specific_time_on_yesterday(
hour=0, minute=0, second=0)
end_time: ClassVar[int] = get_specific_time_on_yesterday(
hour=23, minute=59, second=59)
dest_bucket: ClassVar[str] = 'lambda-cwlogs-s3'
dest_obj_first_prefix: ClassVar[str] = ''
dest_obj_final_prefix: ClassVar[str] = get_yesterday('%Y-%m-%d')
@classmethod
def get_dest_obj_prefix(cls) -> str:
"""Get the full S3 object prefix
Hierarchical structure of S3
dest_bucket/dest_obj_first_prefix/dest_obj_final_prefix/*
Returns:
str
"""
first_prefix = cls.dest_obj_first_prefix or cls.log_group
return f'{first_prefix}/{cls.dest_obj_final_prefix}'
@classmethod
def to_args(cls) -> Dict[str, Union[str, int]]:
args: Dict[str, Union[str, int]] = {
'logGroupName': cls.log_group,
'fromTime': cls.start_time,
'to': cls.end_time,
'destination': cls.dest_bucket,
'destinationPrefix': cls.get_dest_obj_prefix()
}
if cls.log_stream:
args['logStreamNamePrefix'] = cls.log_stream
return args
The CloudWatch Logs API uses the following two
COMPLETED
, export is complete
--Handle CANCELLED
and FAILED
as errors
--Otherwise, it is still incomplete@dataclass
class Exporter:
region: InitVar[str]
client: CloudWatchLogsClient = field(init=False)
def __post_init__(self, region: str):
self.client = boto3.client('logs', region_name=region)
def export(self, target: Type[LogGroup]) -> str:
"""Export any CloudWatch Logs log group to S3
Args:
target (Type[LogGroup])
Raises:
ExportToS3Error
Returns:
str:TaskId included in response from CloudWatch Logs API
"""
try:
response = self.client.create_export_task(
**target.to_args()) # type: ignore
return response['taskId']
except Exception as err:
raise ExportToS3Error(err)
def get_export_progress(self, task_id: str) -> str:
try:
response = self.client.describe_export_tasks(taskId=task_id)
status = response['exportTasks'][0]['status']['code']
return status
except Exception as err:
raise GetExportTaskError(err)
@classmethod
def finishes(cls, status_code: str) -> bool:
"""Determine from the status code whether the export task has finished
Args:
status_code (str):
describe_export_Status code included in the tasks response
Raises:
ExportToS3Failure:If the status code is CANCELLED or FAILED
Returns:
bool
"""
uppercase_status_code = status_code.upper()
if uppercase_status_code == 'COMPLETED':
return True
elif uppercase_status_code in ['CANCELLED', 'FAILED']:
raise ExportToS3Failure('Export failure to S3')
return False
main
Get the set child class of the log group you want to export with LogGroup. \ _ \ _ Subclasses \ _ \ _ (). \ _ \ _ subclasses \ _ \ _ () returns a list, so turn it with a for statement. You can only run one CloudWatch Logs export task at a time in your account, so hit the describe_export_tasks API to see if the task is complete. If it is not completed, I try to wait 5s. Since create_export_task is an asynchronous API, we have no choice but to poll it that way.
def export_to_s3(exporter: Exporter, target: Type[LogGroup]) -> bool:
task_id = exporter.export(target)
logger.info(f'{target.log_group}Is being exported to S3({task_id=})')
while True:
status = exporter.get_export_progress(task_id)
if exporter.finishes(status):
return True
sleep(5)
def main(event: Any, context: Any) -> bool:
exporter = Exporter(region='ap-northeast-1')
targets = LogGroup.__subclasses__()
logger.info(f'The log group to be exported is{len(targets)}Pieces')
for target in targets:
try:
export_to_s3(exporter, target)
except GetExportTaskError as err:
logger.warning(err)
logger.warning(f'{target.log_group}Failure to get progress')
except Exception as err:
logger.error(err)
logger.error(f'{target.log_group}Failed to export to S3')
else:
logger.info(f'{target.log_group}Export to S3')
return True
serverless.yml
The following is a partial excerpt.
Set up an IAM Role so that Lambda can create export tasks and get task information.
Then I want to run the export every day at JST 14:00, so specify cron (0 5 * *? *)
For events. CloudWatch Events runs in UTC, so if you do -9h, it will run at JST 14:00 as expected.
iamRoleStatements:
- Effect: 'Allow'
Action:
- 'logs:createExportTask'
- 'logs:DescribeExportTasks'
Resource:
- 'arn:aws:logs:${self:provider.region}:${self:custom.accountId}:log-group:*'
functions:
export:
handler: src/handler.main
memorySize: 512
timeout: 120
events:
- schedule: cron(0 5 * * ? *)
environment:
TZ: Asia/Tokyo
homoluctus / lambda-cwlogs-s3 also has a CloudFormation template for creating GitHub Actions and destination S3. Please refer to it.
Reference
Recommended Posts