Cela fait une semaine que je l'ai utilisé dans des batailles réelles, mais fluentd est très pratique. Ensuite, essayez de transmettre les données au programme Python avec le plugin de sortie exec.
Voici un exemple de Python dans la documentation (http://docs.fluentd.org/articles/out_exec):
td-agent.conf
<match fizzbuzz>
type exec
command python /path/to/fizzbuzz.py
buffer_path /path/to/buffer_path
format tsv
keys fizzbuzz
flush_interval 5s # for debugging/checking
</match>
Cependant, quand je l'utilise réellement, je veux spécifier le Python que j'ai mis dans virtualenv, et je me demandais ce qui était arrivé à PYTHONPATH, alors j'ai essayé de spécifier
command```.
td-agent.conf
#python spécifie ce que vous mettez dans virtualenv
#Mon code myapp est configuré.Écrivez py et installez le package à l'avance
command /path/to/myapp/bin/python −m myapp.command.xxxx arg1
Vous n'avez plus à vous soucier de PYTHONPATH et vous n'avez pas à écrire un long chemin de fichier. Écrivons setup.py sans aucun problème.
Essayer de définir une variable d'environnement
td-agent.conf
#Spécifiez la variable d'environnement(Obtenez une erreur)
command HOGE=FUGA /path/to/myapp/bin/python -m myapp.command.xxx arg1
Ensuite, j'ai eu l'erreur suivante. Si vous voulez transmettre quelque chose, utilisez l'argument. J'ai vu 32512 pour la première fois.
td-agent.log
2014-08-03 17:59:54 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-08-03 17:59:57 +0900 error_class="RuntimeError" error="command returns 32512: HOGE=FUGA /path/to/myapp/bin/python /path/to/myapp/myapp/command/xxx.py /path/to/buffer_path/.20140803.q4ffb5d85bf81f0d4.log"
Le chemin du fichier tampon est passé à la fin de l'argument. Le code qui lit et traite ligne par ligne ressemble à ceci. Si vous souhaitez appeler chaque processus à partir du code de test, séparez les méthodes. Si vous effectuez un traitement en continu de la lecture du fichier au traitement final, vous pouvez écrire magnifiquement, et avec PyMongo, vous pouvez passer le générateur tel quel à la méthode d'insertion de Collection, les performances sont donc également bonnes.
/path/to/myapp/myapp/command/xxx.py
# coding=utf-8
import json
import logging
import logging.config
import traceback
logging.config.fileConfig('logging.conf')
logger = logging.getLogger('fluent-exec-out')
def main():
file_path = parse_args()
do_something(exclude_error_row(parse(readline(file_path))))
def parse_args():
return sys.argv[-1]
def readline(file_path):
with file(file_path) as input:
for line in input:
yield line
def parse(lines):
#Lorsque le format d'entrée est json
for line in lines:
yield json.loads(line)
def exclude_error_row(rows):
for row in rows:
#Validation et sortie du journal
if xxxxxx:
logger.warn("Invalid line found %s", row)
continue
yield row
def do_something(rows):
#Faire quelque chose
if __name__ == '__main__':
logger.info('Start')
try:
main()
except:
logger.error(traceback.format_exc())
logger.info('End')
Pour tester la méthode principale, créez un fichier au même format que le fichier tampon transmis par fluentd et transmettez-le comme argument.
test_xxx.py
# coding=utf-8
from nose.tools import ok_, eq_
from myapp.command import xxx
original_argv = sys.argv
class TestAll(object):
def teardown(self):
sys.argv = original_argv
def test_main(self):
sys.argv.append('./tests/data/fluentd_buffer.log')
xxx.main()
#Tester quelque chose
def test_readline(self):
gen = xxx.readline('./tests/data/fluentd_buffer.log'):
#Tester quelque chose
Il pourrait être cool d'utiliser quelque chose comme fluent-logger-python et de transmettre le journal à fluentd.
Dans le code ci-dessus, le filtrage est également effectué, mais il peut être intéressant pour fluentd de le faire avec la correspondance précédente en utilisant le plugin de sortie exec_filter. Dans tous les cas, c'est une question de goût.
S'il s'agit de traitement de filtre, je pense que vous pouvez le combiner avec une seule sortie exécutable, mais si vous emballez trop de processus, vous serez plus inquiet de ce qu'il faut faire si les processus sont en mousse au milieu. Il serait préférable de séparer la sortie exec en appels de code séparés.
Recommended Posts