Ceci est l'article sur le 25ème jour du Calendrier de l'Avent Python 2015 de Adventar.
Cet article décrit la réexécution du test Pytest comme un exemple simple de création d'un pipeline de travaux à l'aide de Luigi.
Luigi est un outil de construction de pipeline d'emplois créé par Python. En utilisant Luigi, les éléments suivants nécessaires à la création d'un pipeline de travaux peuvent être exprimés en code Python.
Il semble que l'objectif principal soit de créer un pipeline de travaux avec des tâches qui prennent un certain temps, telles que l'exécution de travaux de Hadoop et Spark, le chargement de données de la base de données vers /, etc., et un module de liaison avec ces outils. Est pris en charge par défaut (http://luigi.readthedocs.org/en/stable/api/luigi.contrib.html#submodules).
C'est un fait indéniable qu'il s'agit d'une épée de vache dans l'exemple pris ici, mais j'ai ressenti le mérite de pouvoir étendre le pipeline dans un cadre fixe, alors j'ai décidé de l'utiliser avec le sentiment de m'y habituer.
Les bases de la définition des tâches de Luigi sont les suivantes.
--Définissez une classe qui hérite de luigi.Task
.
--Définissez la méthode suivante dans une classe qui hérite de luigi.Task
.
--run (self): processus d'exécution de la tâche
--requires (self): dépendances de tâches
--output (self): sauvegarde le traitement des résultats de l'exécution des tâches
luigi.Parameter ()
ou luigi. <Type: ex. Int> Parameter ()
dans la variable de classe.Voici la définition de la tâche pour exécuter pytest
.
Tâche d'exécution de pytest
root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
class PytestTask(luigi.Task):
#Arguments de tâche
pytest_args = luigi.Parameter(default='tests')
repeat_id = luigi.IntParameter()
#Processus d'exécution des tâches
def run(self):
cmd = ['py.test']
cmd.extend(self.pytest_args.split(' '))
os.chdir(root)
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
# self.output()Vous pouvez obtenir le flux à partir duquel écrire le résultat de l'exécution.
out = self.output().open('w')
with open(lastfailed) as f:
out.write(f.read())
out.close()
#Dépendances de tâches
#Renvoie une liste de tâches dépendantes.(ex. return [A(), B()])
#Cette fois, pour diverses raisons, une liste vide(
def requires(self):
return []
#Enregistrer le traitement des résultats de l'exécution des tâches
# luigi.Renvoie une classe dérivée de Target. Dans l'exemple suivant, le résultat de l'exécution est enregistré dans le système de fichiers local.
# (ex) http://luigi.readthedocs.org/en/stable/api/luigi.html#luigi.Target
def output(self):
return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))
Cette fois, je voulais construire un pipeline qui non seulement exécute pytest
de Luigi, mais répond également aux exigences suivantes et réexécute automatiquement le test.
--lf
pour exécuter uniquement le test ayant échoué. ([Référence](http://qiita.com/FGtatsuro/items/0efebb9b58374d16c5f0#%E5%89%8D%E5%9B%9E%E3%81%AE%E5%AE%9F%E8%A1%8C% E3% 81% A7% E5% A4% B1% E6% 95% 97% E3% 81% 97% E3% 81% 9F% E3% 83% 86% E3% 82% B9% E3% 83% 88% E3% 81% AE% E3% 81% BF% E5% 86% 8D% E5% AE% 9F% E8% A1% 8C% E3% 81% 99% E3% 82% 8B))Luigi peut non seulement ajouter des dépendances statiques par requires (self)
mentionné ci-dessus, mais aussi ajouter dynamiquement des dépendances de tâches en fonction des conditions.
Tâche à réexécuter si le test n'a pas réussi
#Un fichier qui enregistre les tests qui ont échoué lors de la dernière exécution
lastfailed = '.cache/v/cache/lastfailed'
class RepeatPytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
repeat = luigi.IntParameter(default=1)
def is_success(self, target):
i = target.open('r')
#Si tout réussit, un dictionnaire vide sera généré
success = bool(not json.load(i))
i.close()
return success
def run(self):
#Exécuter une fois et terminer en cas de succès
out = self.output().open('w')
target = yield PytestTask(
pytest_args=self.pytest_args,
repeat_id=1)
if self.is_success(target):
out.write('success')
out.close()
return
#Exécuter avec l'option lf à partir de la deuxième fois
for i in range(0, self.repeat - 1):
# yield <Instance de tâche>Peut ajouter des dépendances dynamiques avec
target = yield PytestTask(
pytest_args='{0} --lf'.format(self.pytest_args),
repeat_id=i + 2)
#L'exécution se termine en cas de succès
if self.is_success(target):
out.write('success')
out.close()
return
#L'échec est resté jusqu'à la fin
out.write('failure')
out.close()
def output(self):
return luigi.LocalTarget('test_repeats.txt')
En plus de la définition de tâche décrite ci-dessus, le programme entier qui inclut le processus de démarrage du pipeline est le suivant.
pytest_pipeline.py
import json
import os
import sys
from contextlib import contextmanager
from subprocess import Popen, PIPE
import luigi
root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
lastfailed = '.cache/v/cache/lastfailed'
class PytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
repeat_id = luigi.IntParameter()
def output(self):
return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))
def run(self):
cmd = ['py.test']
cmd.extend(self.pytest_args.split(' '))
os.chdir(root)
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
out = self.output().open('w')
with open(lastfailed) as f:
out.write(f.read())
out.close()
class RepeatPytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
#Le nombre de répétitions est donné en argument de l'extérieur
repeat = luigi.IntParameter(default=1)
def is_success(self, target):
i = target.open('r')
success = bool(not json.load(i))
i.close()
return success
def output(self):
return luigi.LocalTarget('test_repeats.txt')
def run(self):
out = self.output().open('w')
target = yield PytestTask(
pytest_args=self.pytest_args,
repeat_id=1)
if self.is_success(target):
out.write('success')
out.close()
return
for i in range(0, self.repeat - 1):
target = yield PytestTask(
pytest_args='{0} --lf'.format(self.pytest_args),
repeat_id=i + 2)
if self.is_success(target):
out.write('success')
out.close()
return
out.write('failure')
out.close()
#Processus de démarrage du pipeline
if __name__ == '__main__':
argv = ['RepeatPytestTask']
if len(sys.argv) > 1:
argv.extend(sys.argv[1:])
luigi.run(argv)
En donnant au programme ci-dessus le nombre de répétitions (--repeat
) et en l'exécutant, un pipeline de test qui se ré-exécute automatiquement en cas d'échec peut être réalisé.
Exécution du pipeline de travaux
#Luigi est sorti(self)Est sortie=Considérez la tâche à accomplir.
#Si vous souhaitez exécuter la tâche depuis le début, supprimez toutes les sorties.
$ rm -rf test_repeat_1.txt test_repeats.txt test_repeat_2.txt
#Pour un traitement à grande échelle, un planificateur de tâches peut être créé séparément.
#Cette fois, c'est un petit processus, alors planifiez-le localement(--local-option de planificateur)
# http://luigi.readthedocs.org/en/stable/central_scheduler.html?highlight=scheduler%20server
$ python pytest_pipeline.py --local-scheduler --repeat 3
Recommended Posts