[Python] Exportez régulièrement de CloudWatch Logs vers S3 avec Lambda

Exportez régulièrement les journaux enregistrés dans CloudWatch Logs vers S3 créés dans [AWS] CloudFormation pour créer un compartiment S3 et définir des règles de cycle de vie C'est l'histoire du développement de Lambda en Python.

J'ai créé un référentiel GitHub pour un essai facile-> homoluctus / lambda-cwlogs-s3

Exigences

--Exporter le journal de la veille au JST 14h00 tous les jours --Plusieurs groupes de journaux peuvent être exportés

développement de

Langue

Bibliothèque de développement

Bibliothèque de production

Outil de déploiement

Service AWS

code

Il est impossible de mettre tout le code, donc seul le code principal est mis pour explication Voir Repositories pour le code source complet.

Classe pour le paramètre de groupe de journaux à exporter

Activé pour définir plusieurs groupes de journaux à exporter comme l'interface TypeScript. Si vous souhaitez ajouter un groupe de journaux à exporter, héritez simplement de la classe LogGroup.

La clé d'objet S3 à exporter est rendue facile à comprendre dans quel journal de groupe de journaux. Comme je l'ai écrit dans la docstring, il a une structure hiérarchique comme dest_bucket / dest_obj_first_prefix ou log_group / dest_obj_final_prefix / *. Si dest_obj_first_prefix n'est pas spécifié, le nom de log_group sera entré. Après *, cela ressemble à ʻexport task ID / log stream / file`. Ceci est automatiquement ajouté et ne peut pas être contrôlé.

class LogGroup(object, metaclass=ABCMeta):
    """Classe de base de configuration pour l'exportation de CloudWatch Logs vers S3

Comment ajouter un groupe de journaux à exporter

    class Example(LogGroup):
        log_group = 'test'
    """

    # log_groupe est obligatoire, sinon facultatif
    log_group: ClassVar[str]

    log_stream: ClassVar[str] = ''
    start_time: ClassVar[int] = get_specific_time_on_yesterday(
        hour=0, minute=0, second=0)
    end_time: ClassVar[int] = get_specific_time_on_yesterday(
        hour=23, minute=59, second=59)
    dest_bucket: ClassVar[str] = 'lambda-cwlogs-s3'
    dest_obj_first_prefix: ClassVar[str] = ''
    dest_obj_final_prefix: ClassVar[str] = get_yesterday('%Y-%m-%d')

    @classmethod
    def get_dest_obj_prefix(cls) -> str:
        """Obtenez le préfixe d'objet S3 complet

Structure hiérarchique de S3
        dest_bucket/dest_obj_first_prefix/dest_obj_final_prefix/*

        Returns:
            str
        """

        first_prefix = cls.dest_obj_first_prefix or cls.log_group
        return f'{first_prefix}/{cls.dest_obj_final_prefix}'

    @classmethod
    def to_args(cls) -> Dict[str, Union[str, int]]:
        args: Dict[str, Union[str, int]] = {
            'logGroupName': cls.log_group,
            'fromTime': cls.start_time,
            'to': cls.end_time,
            'destination': cls.dest_bucket,
            'destinationPrefix': cls.get_dest_obj_prefix()
        }

        if cls.log_stream:
            args['logStreamNamePrefix'] = cls.log_stream

        return args

Client CloudWatch Logs

Les deux suivants sont utilisés dans l'API CloudWatch Logs

@dataclass
class Exporter:
    region: InitVar[str]
    client: CloudWatchLogsClient = field(init=False)

    def __post_init__(self, region: str):
        self.client = boto3.client('logs', region_name=region)

    def export(self, target: Type[LogGroup]) -> str:
        """Exportez n'importe quel groupe de journaux CloudWatch Logs vers S3

        Args:
            target (Type[LogGroup])

        Raises:
            ExportToS3Error

        Returns:
            str:TaskId inclus dans la réponse de l'API CloudWatch Logs
        """

        try:
            response = self.client.create_export_task(
                **target.to_args())  # type: ignore
            return response['taskId']
        except Exception as err:
            raise ExportToS3Error(err)

    def get_export_progress(self, task_id: str) -> str:
        try:
            response = self.client.describe_export_tasks(taskId=task_id)
            status = response['exportTasks'][0]['status']['code']
            return status
        except Exception as err:
            raise GetExportTaskError(err)

    @classmethod
    def finishes(cls, status_code: str) -> bool:
        """Déterminez à partir du code d'état si la tâche d'exportation est terminée

        Args:
            status_code (str):
                describe_export_Code d'état inclus dans la réponse des tâches

        Raises:
            ExportToS3Failure:Si le code d'état est ANNULÉ ou ÉCHEC

        Returns:
            bool
        """

        uppercase_status_code = status_code.upper()

        if uppercase_status_code == 'COMPLETED':
            return True
        elif uppercase_status_code in ['CANCELLED', 'FAILED']:
            raise ExportToS3Failure('Échec d'exportation vers S3')
        return False

main

Utilisez LogGroup. \ _ \ _ Subclasses \ _ \ _ () pour obtenir la classe enfant définie du groupe de journaux que vous souhaitez exporter. \ _ \ _ Subclasses \ _ \ _ () renvoie une liste, alors tournez-la avec une instruction for. Vous ne pouvez exécuter qu'une seule tâche d'exportation CloudWatch Logs à la fois dans votre compte, alors appuyez sur l'API describe_export_tasks pour voir si la tâche est terminée. S'il n'est pas terminé, j'attendrai 5s. Puisque create_export_task est une API asynchrone, nous n'avons pas d'autre choix que de l'interroger de cette façon.

def export_to_s3(exporter: Exporter, target: Type[LogGroup]) -> bool:
    task_id = exporter.export(target)
    logger.info(f'{target.log_group}Est en cours d'exportation vers S3({task_id=})')

    while True:
        status = exporter.get_export_progress(task_id)
        if exporter.finishes(status):
            return True
        sleep(5)


def main(event: Any, context: Any) -> bool:
    exporter = Exporter(region='ap-northeast-1')
    targets = LogGroup.__subclasses__()
    logger.info(f'Le groupe de journaux à exporter est{len(targets)}Pièces')

    for target in targets:
        try:
            export_to_s3(exporter, target)
        except GetExportTaskError as err:
            logger.warning(err)
            logger.warning(f'{target.log_group}Échec de la progression')
        except Exception as err:
            logger.error(err)
            logger.error(f'{target.log_group}Échec de l'exportation vers S3')
        else:
            logger.info(f'{target.log_group}Exporter vers S3')

    return True

serverless.yml

Ce qui suit est un extrait partiel. Définissez le rôle IAM afin que Lambda puisse créer des tâches d'exportation et obtenir des informations sur les tâches. Ensuite, je veux exécuter l'exportation tous les jours à JST 14:00, alors spécifiez cron (0 5 * *? *) Pour les événements. CloudWatch Events s'exécute à UTC, donc si vous faites -9h, il s'exécutera à JST 14h00 comme prévu.

  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - 'logs:createExportTask'
        - 'logs:DescribeExportTasks'
      Resource:
        - 'arn:aws:logs:${self:provider.region}:${self:custom.accountId}:log-group:*'

functions:
  export:
    handler: src/handler.main
    memorySize: 512
    timeout: 120
    events:
      - schedule: cron(0 5 * * ? *)
    environment:
      TZ: Asia/Tokyo

en conclusion

homoluctus / lambda-cwlogs-s3 dispose également d'un modèle CloudFormation pour créer des actions GitHub et des S3 de destination. Veuillez vous y référer.

Reference

Recommended Posts

[Python] Exportez régulièrement de CloudWatch Logs vers S3 avec Lambda
Connectez-vous à s3 avec AWS Lambda Python
Déplacer régulièrement les journaux CloudWatch vers S3 avec Lambda
[Lambda] [Python] Publier sur Twitter depuis Lambda!
Je veux analyser les journaux avec Python
Comment accéder à RDS depuis Lambda (python)
Téléchargez ce que vous avez dans la demande vers S3 avec AWS Lambda Python
Copier des données d'Amazon S3 vers Google Cloud Storage avec Python (boto)
Exécutez régulièrement le scraping WEB avec AWS-Lambda + Python + Cron
J'ai essayé d'obtenir des données CloudWatch avec Python
Exemple de notification Slack avec python lambda
Télécharger des fichiers sur Google Drive avec Lambda (Python)
De la construction d'environnement Python à la construction d'environnement virtuel avec anaconda
Changements de Python 3.0 à Python 3.5
Comment récupérer des données d'image de Flickr avec Python
De l'achat d'un ordinateur à l'exécution d'un programme sur python
Je veux AWS Lambda avec Python sur Mac!
Copier les fichiers S3 de Python vers GCS à l'aide de GSUtil
Accès ODBC à SQL Server depuis Linux avec Python
Fonction Lambda (version python) qui décompresse et génère des éléments dans CloudWatch Logs lorsqu'un fichier compressé est téléchargé vers s3
Connectez-vous à BigQuery avec Python
Publier de Python vers Slack
[S3] CRUD avec S3 utilisant Python [Python]
Flirter de PHP à Python
Connectez-vous à Wikipedia avec Python
[AWS] Essayez d'ajouter la bibliothèque Python à la couche avec SAM + Lambda (Python)
Publiez sur Slack avec Python 3
Opération S3 avec python boto3
Anaconda mis à jour de 4.2.0 à 4.3.0 (python3.5 mis à jour vers python3.6)
Interroger Athena depuis Lambda Python
Défi problème 5 avec Python: lambda ... j'ai décidé de copier sans
Introduction à Python pour les utilisateurs de VBA - Appeler Python depuis Excel avec xlwings -
Passer de python2.7 à python3.6 (centos7)
Connectez-vous à sqlite depuis python
Basculer python vers 2.7 avec des alternatives
Écrire en csv avec Python
Avec skype, notifiez avec skype de python!
Comment utiliser Python lambda
Traitez le fichier gzip UNLOADed avec Redshift avec Python de Lambda, gzipez-le à nouveau et téléchargez-le sur S3
Une histoire que j'ai corrigée lorsque j'ai obtenu le journal Lambda de Cloudwatch Logs
[Python 3.8 ~] Comment définir intelligemment des fonctions récursives avec des expressions lambda
Envoyer les images prises avec ESP32-WROOM-32 vers AWS (API Gateway → Lambda → S3)
Essayez d'extraire une chaîne de caractères d'une image avec Python3
Précautions lors de l'exécution de Python sur EC2 à partir d'AWS Lambda (Exécuter la commande)
Introduction à l'analyse de données par Python P17-P26 [ch02 1.usa.gov données de bit.ly]
Manipulation des données Kintone avec le pilote ODBC Python & C Data d'AWS Lambda
J'ai lu "Renforcer l'apprentissage avec Python de l'introduction à la pratique" Chapitre 1
De l'introduction de JUMAN ++ à l'analyse morphologique du japonais avec Python
Passer la liste de Python vers C ++ par référence dans pybind11
J'ai lu "Renforcer l'apprentissage avec Python de l'introduction à la pratique" Chapitre 2
Distribution HDA de Houdini pour exporter FBX avec hiérarchie et transformations
Essayez d'obtenir des métriques CloudWatch avec la source de données python re: dash
Le moyen le plus rapide d'obtenir régulièrement des images de caméra avec opencv de python
[Python] Essayez de reconnaître les caractères des images avec OpenCV et pyocr
Appelez Matlab depuis Python pour optimiser
[Présentation de l'application Udemy Python3 +] 58. Lambda
Python: comment utiliser async avec
Appeler C depuis Python avec DragonFFI
Utilisation de Rstan de Python avec PypeR