Utilisez des programmes Python avec le plugin de sortie exec_filter de fluentd

exec_filter Output Plugin (out_ecec_filter) est fondamentalement un processus de filtrage qui peut être effectué en utilisant n'importe quel programme. Par exemple, vous pouvez supprimer les enregistrements inutiles pour le processus suivant ou les enregistrer quelque part comme une erreur. Utilisons maintenant un programme Python.

Réglage

J'ai essayé comme suit.

<match hoge>
  type       exec_filter
  command    /path/to/python -m myapp.fluent_stream_filter

  time_key   time
  in_format  json
  out_format msgpack

  buffer_type file
  buffer_path /path/to/buffer
  buffer_chunk_limit 8m
  buffer_queue_limit 64
  flush_interval 10s

  tag        fuga
</match>

Je l'exécute en tant que module avec command pour que l'importation fonctionne bien. Dans ce cas, il n'est pas nécessaire de spécifier PYTHONPATH.

J'utilise un tampon de fichier car je dois redémarrer fluend lorsque je modifie le code. Contrairement au plug-in de sortie exec, le plug-in de sortie exec_filter attend l'entrée standard après le démarrage du script spécifié.

scénario

Comme demandé par le plugin de sortie exec_filter, le script doit prendre l'entrée standard et envoyer le résultat vers la sortie standard.

fluent_stream_filter.py


# coding=utf-8
"""
fluentd_exec_Code appelé par filtre

Traite ligne par ligne de l'entrée standard et sort le résultat vers la sortie standard
"""
import json
import logging
import logging.config
import sys
import traceback

import msgpack

logging.config.fileConfig('/path/to/logging.conf')
logger = logging.getLogger('fluent-exec')
logger.propagate = False

def main():
    stdin = sys.stdin
    output(convert(validate(parse(readline(stdin)))))

def readline(stdin):
    for line in stdin:
        yield line

def parse(lines):
    for line in lines:
        yield json.loads(line)

def validate(rows):
    """Suppression des données d'erreur"""
    for row in rows:
        try:
		     #Une sorte d'inspection
        except Exception, e:
            logger.warn(e)
            logger.warn(row)
            continue
        yield row

def convert(rows):
    """Certains processus de conversion"""
    for row in rows:
        # do something
        yield row

def output(rows):
	"""Expirez à la sortie standard"""
    for row in rows:
        sys.stdout.write(msgpack.packb(row))

if __name__ == '__main__':
    logger.info('Start main')
    try:
        main()
    except Exception, e:
        logger.error(traceback.format_exc())
        raise e
    logger.info('End main')

Il est habituel de passer «name» à logging.getLogger, mais ici le contenu est «__ main __» parce que le script est lancé, donc le nom de l'enregistreur est spécifié par une chaîne de caractères.

Code de test

Il peut être préférable pour la méthode qui gère l'entrée standard de recevoir stdio en tant que paramètre. Je n'ai pas trouvé de moyen d'utiliser l'entrée standard telle qu'elle est dans le test, j'ai donc passé une instance de StringIO afin que les données de test puissent être lues.

# coding=utf-8
import types
from StringIO import StringIO

from nose.tools import ok_, eq_

from myapp import fluent_stream_filter


class TestAll(object):
    def test_readline(self):
        """test readline, sys.Passer StringIO au lieu de stdin"""
        stdin = StringIO()
        stdin.write(open('/path/to/test_data.log').read())
        stdin.seek(0)

        stream = fluent_stream_filter.readline(stdin)
        ok_(isinstance(stream, types.GeneratorType))
        eq_(stream.next(), u'{Contenu de la première ligne}')

    def test_parse(self):
        """test d'analyse"""
        stream = fluent_stream_filter.parse(iter(['{...}', '{...}']))
        ok_(isinstance(stream, types.GeneratorType))
        eq_(stream.next(), {...})
        eq_(stream.next(), {...})

    # (Abréviation)

Attention dans la sortie du journal

Si vous crachez un journal autre que le résultat du traitement vers la sortie standard, bien sûr, il ne peut pas être analysé comme MessagePack ou JSON, donc une erreur se produira du côté fluentd. Pour éviter les accidents tels que le gestionnaire de console défini dans le journal racine et cracher involontairement des journaux sur la sortie standard

logger.propagate = False

C'est sûr de faire

Recommended Posts

Utilisez des programmes Python avec le plugin de sortie exec_filter de fluentd
Utilisez des programmes Python avec le plugin de sortie exec de fluentd
[Python] Un programme qui crée des escaliers avec #
Un programme qui utilise Python pour lire des fichiers indésirables
[Python] Générer ValueObject avec un constructeur complet à l'aide de classes de données
Essayez d'incorporer Python dans un programme C ++ avec pybind11
Enregistrez des tickets avec l'API de Redmine en utilisant des requêtes Python
De l'achat d'un ordinateur à l'exécution d'un programme sur python
[Python] Créez un fichier de distribution pour le programme Tkinter avec cx_Freeze
Créer un plugin Wox (Python)
[S3] CRUD avec S3 utilisant Python [Python]
Utilisation de Quaternion avec Python ~ numpy-quaternion ~
Essayez la sortie Python avec Haxe 3.2
[Python] Utilisation d'OpenCV avec Python (basique)
Utiliser une imprimante avec Debian 10
Faites une loterie avec Python
Créer un répertoire avec python
Utiliser OpenCV avec Python @Mac
Envoyer en utilisant Python avec Gmail
J'ai écrit rapidement un programme pour étudier la DI avec Python ①
[Python] Chapitre 01-03 À propos de Python (Ecrire et exécuter un programme à l'aide de PyCharm)
Compléter python avec emacs en utilisant company-jedi
Convertir en chaîne lors de la sortie de la sortie standard avec le sous-processus Python
Moyenne harmonique par Python (en utilisant SciPy)
[Python] Qu'est-ce qu'une instruction with?
Comment démarrer par lots un programme Python créé avec le notebook Jupyter
Résoudre ABC163 A ~ C avec Python
Contrôlez le moteur avec un pilote de moteur en utilisant python sur Raspberry Pi 3!
Faites fonctionner l'imprimante de reçus avec python
Manuel de graphisme Python avec Matplotlib.
[Python] Utilisation d'OpenCV avec Python (filtrage d'image)
J'ai fait un Line-bot avec Python!
Créer une interface graphique python à l'aide de tkinter
[Python] Utilisation d'OpenCV avec Python (transformation d'image)
Faisons une interface graphique avec python.
Dessiner une courbe Silverstone en utilisant Python
Résoudre ABC166 A ~ D avec Python
[Python] Utilisation d'OpenCV avec Python (détection des bords)
Sortie vers un fichier csv avec Python
Entrée / sortie avec Python (mémo d'apprentissage Python ⑤)
Déboguer un programme multi-processus python avec VSCode
Créez un environnement virtuel avec Python!
J'ai fait une loterie avec Python.
Créer un environnement virtuel avec Python 3
Installer le plug-in Python avec Netbeans 8.0.2
Résoudre ABC168 A ~ C avec Python
[Note] Sortie Hello world avec python
Sortie du journal de test unitaire avec python
Créer un système de recommandation avec python
[Ev3dev] Créez un programme qui capture LCD (écran) en utilisant python
Lisez le fichier en Python avec un chemin relatif depuis le programme
Ecrire le plugin vim en Python
[Python] Générer un mot de passe avec Slackbot
Résoudre ABC162 A ~ C avec Python
J'ai essayé de créer une application todo en utilisant une bouteille avec python
Notes sur l'utilisation de rstrip avec python.
Résoudre ABC167 A ~ C avec Python
Faisons un graphe avec python! !!
Créez un environnement d'exécution Python à l'aide de GPU avec GCP Compute Engine