Utilisez des programmes Python avec le plugin de sortie exec de fluentd

Cela fait une semaine que je l'ai utilisé dans des batailles réelles, mais fluentd est très pratique. Ensuite, essayez de transmettre les données au programme Python avec le plugin de sortie exec.

Commande de démarrage

Voici un exemple de Python dans la documentation (http://docs.fluentd.org/articles/out_exec):

td-agent.conf


<match fizzbuzz>
  type exec
  command python /path/to/fizzbuzz.py
  buffer_path    /path/to/buffer_path
  format tsv
  keys fizzbuzz
  flush_interval 5s # for debugging/checking
</match>

Cependant, quand je l'utilise réellement, je veux spécifier le Python que j'ai mis dans virtualenv, et je me demandais ce qui était arrivé à PYTHONPATH, alors j'ai essayé de spécifier command```.

td-agent.conf


  #python spécifie ce que vous mettez dans virtualenv
  #Mon code myapp est configuré.Écrivez py et installez le package à l'avance
  command /path/to/myapp/bin/python −m myapp.command.xxxx arg1

Vous n'avez plus à vous soucier de PYTHONPATH et vous n'avez pas à écrire un long chemin de fichier. Écrivons setup.py sans aucun problème.

Essayer de définir une variable d'environnement

td-agent.conf


  #Spécifiez la variable d'environnement(Obtenez une erreur)
  command HOGE=FUGA /path/to/myapp/bin/python -m myapp.command.xxx arg1

Ensuite, j'ai eu l'erreur suivante. Si vous voulez transmettre quelque chose, utilisez l'argument. J'ai vu 32512 pour la première fois.

td-agent.log


2014-08-03 17:59:54 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-08-03 17:59:57 +0900 error_class="RuntimeError" error="command returns 32512: HOGE=FUGA /path/to/myapp/bin/python /path/to/myapp/myapp/command/xxx.py /path/to/buffer_path/.20140803.q4ffb5d85bf81f0d4.log"

Dossier d'exécution

Le chemin du fichier tampon est passé à la fin de l'argument. Le code qui lit et traite ligne par ligne ressemble à ceci. Si vous souhaitez appeler chaque processus à partir du code de test, séparez les méthodes. Si vous effectuez un traitement en continu de la lecture du fichier au traitement final, vous pouvez écrire magnifiquement, et avec PyMongo, vous pouvez passer le générateur tel quel à la méthode d'insertion de Collection, les performances sont donc également bonnes.

/path/to/myapp/myapp/command/xxx.py


# coding=utf-8

import json
import logging
import logging.config
import traceback

logging.config.fileConfig('logging.conf')
logger = logging.getLogger('fluent-exec-out')

def main():
	file_path = parse_args()
    do_something(exclude_error_row(parse(readline(file_path))))

def parse_args():
	return sys.argv[-1]

def readline(file_path):
    with file(file_path) as input:
        for line in input:
            yield line

def parse(lines):
	#Lorsque le format d'entrée est json
    for line in lines:
        yield json.loads(line)

def exclude_error_row(rows):
    for row in rows:
        #Validation et sortie du journal
        if xxxxxx:
			logger.warn("Invalid line found %s", row)
			continue
       yield row

def do_something(rows):
	#Faire quelque chose


if __name__ == '__main__':
	logger.info('Start')
	try:
		main()
	except:
		logger.error(traceback.format_exc())
	logger.info('End')

Code de test

Pour tester la méthode principale, créez un fichier au même format que le fichier tampon transmis par fluentd et transmettez-le comme argument.

test_xxx.py


# coding=utf-8

from nose.tools import ok_, eq_

from myapp.command import xxx

original_argv = sys.argv

class TestAll(object):
	def teardown(self):
		sys.argv = original_argv

	def test_main(self):
		sys.argv.append('./tests/data/fluentd_buffer.log')
		xxx.main()
		#Tester quelque chose

	def test_readline(self):
		gen = xxx.readline('./tests/data/fluentd_buffer.log'):
		#Tester quelque chose

Sortie de journal

Il pourrait être cool d'utiliser quelque chose comme fluent-logger-python et de transmettre le journal à fluentd.

Filtration

Dans le code ci-dessus, le filtrage est également effectué, mais il peut être intéressant pour fluentd de le faire avec la correspondance précédente en utilisant le plugin de sortie exec_filter. Dans tous les cas, c'est une question de goût.

S'il s'agit de traitement de filtre, je pense que vous pouvez le combiner avec une seule sortie exécutable, mais si vous emballez trop de processus, vous serez plus inquiet de ce qu'il faut faire si les processus sont en mousse au milieu. Il serait préférable de séparer la sortie exec en appels de code séparés.

Recommended Posts

Utilisez des programmes Python avec le plugin de sortie exec de fluentd
Utilisez des programmes Python avec le plugin de sortie exec_filter de fluentd
[Python] Un programme qui crée des escaliers avec #
Un programme qui utilise Python pour lire des fichiers indésirables
[Python] Générer ValueObject avec un constructeur complet à l'aide de classes de données
Enregistrez des tickets avec l'API de Redmine en utilisant des requêtes Python
[Python] Créez un fichier de distribution pour le programme Tkinter avec cx_Freeze
Créer un plugin Wox (Python)
[S3] CRUD avec S3 utilisant Python [Python]
Utilisation de Quaternion avec Python ~ numpy-quaternion ~
Essayez la sortie Python avec Haxe 3.2
[Python] Utilisation d'OpenCV avec Python (basique)
Utiliser une imprimante avec Debian 10
Faites une loterie avec Python
Créer un répertoire avec python
Utiliser OpenCV avec Python @Mac
Envoyer en utilisant Python avec Gmail
J'ai écrit rapidement un programme pour étudier la DI avec Python ①
[Python] Chapitre 01-03 À propos de Python (Ecrire et exécuter un programme à l'aide de PyCharm)
Compléter python avec emacs en utilisant company-jedi
Convertir en chaîne lors de la sortie de la sortie standard avec le sous-processus Python
Moyenne harmonique par Python (en utilisant SciPy)
[Python] Qu'est-ce qu'une instruction with?
Comment démarrer par lots un programme Python créé avec le notebook Jupyter
Résoudre ABC163 A ~ C avec Python
Contrôlez le moteur avec un pilote de moteur en utilisant python sur Raspberry Pi 3!
Faites fonctionner l'imprimante de reçus avec python
Manuel de graphisme Python avec Matplotlib.
[Python] Utilisation d'OpenCV avec Python (filtrage d'image)
J'ai fait un Line-bot avec Python!
Utilisation de Rstan de Python avec PypeR
Créer une interface graphique python à l'aide de tkinter
[Python] Utilisation d'OpenCV avec Python (transformation d'image)
Faisons une interface graphique avec python.
Dessiner une courbe Silverstone en utilisant Python
Résoudre ABC166 A ~ D avec Python
[Python] Utilisation d'OpenCV avec Python (détection des bords)
Sortie vers un fichier csv avec Python
Entrée / sortie avec Python (mémo d'apprentissage Python ⑤)
Déboguer un programme multi-processus python avec VSCode
Créer un environnement virtuel avec Python 3
Installer le plug-in Python avec Netbeans 8.0.2
Résoudre ABC168 A ~ C avec Python
[Note] Sortie Hello world avec python
Sortie du journal de test unitaire avec python
Créer un système de recommandation avec python
[Ev3dev] Créez un programme qui capture LCD (écran) en utilisant python
Lisez le fichier en Python avec un chemin relatif depuis le programme
Ecrire le plugin vim en Python
[Python] Générer un mot de passe avec Slackbot
J'ai essayé de créer une application todo en utilisant une bouteille avec python
Résoudre ABC167 A ~ C avec Python
Résoudre ABC158 A ~ C avec Python
[Python] Lire un fichier csv avec une grande taille de données à l'aide d'un générateur
[Remarque] Utilisation d'un écran LCD à 16 caractères à 2 chiffres (1602A) de Python avec Raspeye
Sortie interactive de BPE à l'aide de curses python
Créez un environnement d'exécution Python à l'aide de GPU avec GCP Compute Engine
J'ai créé un chat-holdem de serveur de jeu de poker en utilisant websocket avec python
Faisons une discussion WEB en utilisant WebSocket avec AWS sans serveur (Python)!