exec_filter Output Plugin (out_ecec_filter) est fondamentalement un processus de filtrage qui peut être effectué en utilisant n'importe quel programme. Par exemple, vous pouvez supprimer les enregistrements inutiles pour le processus suivant ou les enregistrer quelque part comme une erreur. Utilisons maintenant un programme Python.
J'ai essayé comme suit.
<match hoge>
type exec_filter
command /path/to/python -m myapp.fluent_stream_filter
time_key time
in_format json
out_format msgpack
buffer_type file
buffer_path /path/to/buffer
buffer_chunk_limit 8m
buffer_queue_limit 64
flush_interval 10s
tag fuga
</match>
Je l'exécute en tant que module avec command
pour que l'importation fonctionne bien. Dans ce cas, il n'est pas nécessaire de spécifier PYTHONPATH.
J'utilise un tampon de fichier car je dois redémarrer fluend lorsque je modifie le code. Contrairement au plug-in de sortie exec, le plug-in de sortie exec_filter attend l'entrée standard après le démarrage du script spécifié.
Comme demandé par le plugin de sortie exec_filter, le script doit prendre l'entrée standard et envoyer le résultat vers la sortie standard.
fluent_stream_filter.py
# coding=utf-8
"""
fluentd_exec_Code appelé par filtre
Traite ligne par ligne de l'entrée standard et sort le résultat vers la sortie standard
"""
import json
import logging
import logging.config
import sys
import traceback
import msgpack
logging.config.fileConfig('/path/to/logging.conf')
logger = logging.getLogger('fluent-exec')
logger.propagate = False
def main():
stdin = sys.stdin
output(convert(validate(parse(readline(stdin)))))
def readline(stdin):
for line in stdin:
yield line
def parse(lines):
for line in lines:
yield json.loads(line)
def validate(rows):
"""Suppression des données d'erreur"""
for row in rows:
try:
#Une sorte d'inspection
except Exception, e:
logger.warn(e)
logger.warn(row)
continue
yield row
def convert(rows):
"""Certains processus de conversion"""
for row in rows:
# do something
yield row
def output(rows):
"""Expirez à la sortie standard"""
for row in rows:
sys.stdout.write(msgpack.packb(row))
if __name__ == '__main__':
logger.info('Start main')
try:
main()
except Exception, e:
logger.error(traceback.format_exc())
raise e
logger.info('End main')
Il est habituel de passer «name» à logging.getLogger, mais ici le contenu est «__ main __» parce que le script est lancé, donc le nom de l'enregistreur est spécifié par une chaîne de caractères.
Il peut être préférable pour la méthode qui gère l'entrée standard de recevoir stdio en tant que paramètre. Je n'ai pas trouvé de moyen d'utiliser l'entrée standard telle qu'elle est dans le test, j'ai donc passé une instance de StringIO afin que les données de test puissent être lues.
# coding=utf-8
import types
from StringIO import StringIO
from nose.tools import ok_, eq_
from myapp import fluent_stream_filter
class TestAll(object):
def test_readline(self):
"""test readline, sys.Passer StringIO au lieu de stdin"""
stdin = StringIO()
stdin.write(open('/path/to/test_data.log').read())
stdin.seek(0)
stream = fluent_stream_filter.readline(stdin)
ok_(isinstance(stream, types.GeneratorType))
eq_(stream.next(), u'{Contenu de la première ligne}')
def test_parse(self):
"""test d'analyse"""
stream = fluent_stream_filter.parse(iter(['{...}', '{...}']))
ok_(isinstance(stream, types.GeneratorType))
eq_(stream.next(), {...})
eq_(stream.next(), {...})
# (Abréviation)
Si vous crachez un journal autre que le résultat du traitement vers la sortie standard, bien sûr, il ne peut pas être analysé comme MessagePack ou JSON, donc une erreur se produira du côté fluentd. Pour éviter les accidents tels que le gestionnaire de console défini dans le journal racine et cracher involontairement des journaux sur la sortie standard
logger.propagate = False
C'est sûr de faire
Recommended Posts