This is the article on the 25th day of Python Advent Calendar 2015 of Adventar.
In this article, I will describe the test re-execution of Pytest as a simple example of building a job pipeline using Luigi.
Luigi is a job pipeline construction tool made by Python. By using Luigi, the following things necessary for building a job pipeline can be expressed in Python code.
--Task execution --Dependencies between tasks --Save task execution results
It seems that the main target is to build a job pipeline with tasks that take a certain amount of time, such as Hadoop and Spark job execution, data loading from the database to /, etc., and a module for linking with those tools. Is supported by default (http://luigi.readthedocs.org/en/stable/api/luigi.contrib.html#submodules).
It is an undeniable fact that it is a chef's knife in the example taken this time, but I felt the merit of being able to expand the pipeline within a fixed framework, so I decided to use it with a sense of getting used to it.
The basics of Luigi's task definition are as follows.
--Define a class that inherits luigi.Task
.
--Define the following method in the class that inherits luigi.Task
.
--run (self): Task execution process
--requires (self): Task dependencies
--output (self): Save processing of task execution results
--If the task requires arguments, have luigi.Parameter ()
or luigi. <Type: ex. Int> Parameter ()
in the class variable.
The following is the task definition for executing pytest
.
Task of running pytest
root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
class PytestTask(luigi.Task):
#Task arguments
pytest_args = luigi.Parameter(default='tests')
repeat_id = luigi.IntParameter()
#Task execution process
def run(self):
cmd = ['py.test']
cmd.extend(self.pytest_args.split(' '))
os.chdir(root)
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
# self.output()You can get the stream to write the execution result from.
out = self.output().open('w')
with open(lastfailed) as f:
out.write(f.read())
out.close()
#Task dependencies
#Returns a list of dependent tasks.(ex. return [A(), B()])
#This time, due to various reasons, an empty list(
def requires(self):
return []
#Save processing of task execution results
# luigi.Returns a class derived from Target. In the following example, the execution result is saved in the local file system.
# (ex) http://luigi.readthedocs.org/en/stable/api/luigi.html#luigi.Target
def output(self):
return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))
This time, I wanted to build a pipeline that not only runs pytest
from Luigi, but also meets the following requirements and automatically reruns the test.
--If all the tests are not successful, re-execute. The upper limit of the number of re-executions is given as an argument.
--When rerunning a test, use the --lf
option to run only the failed test. ([Reference](http://qiita.com/FGtatsuro/items/0efebb9b58374d16c5f0#%E5%89%8D%E5%9B%9E%E3%81%AE%E5%AE%9F%E8%A1%8C% E3% 81% A7% E5% A4% B1% E6% 95% 97% E3% 81% 97% E3% 81% 9F% E3% 83% 86% E3% 82% B9% E3% 83% 88% E3% 81% AE% E3% 81% BF% E5% 86% 8D% E5% AE% 9F% E8% A1% 8C% E3% 81% 99% E3% 82% 8B))
Luigi can not only add static dependencies by requires (self)
mentioned above, but also dynamically add task dependencies depending on conditions.
Task to rerun if the antecedent did not succeed
#A file that records tests that failed during the last run
lastfailed = '.cache/v/cache/lastfailed'
class RepeatPytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
repeat = luigi.IntParameter(default=1)
def is_success(self, target):
i = target.open('r')
#If all are successful, an empty dictionary will be generated
success = bool(not json.load(i))
i.close()
return success
def run(self):
#Run once and finish if successful
out = self.output().open('w')
target = yield PytestTask(
pytest_args=self.pytest_args,
repeat_id=1)
if self.is_success(target):
out.write('success')
out.close()
return
#Execute with lf option from the second time onwards
for i in range(0, self.repeat - 1):
# yield <Instance of task>Can add dynamic dependencies with
target = yield PytestTask(
pytest_args='{0} --lf'.format(self.pytest_args),
repeat_id=i + 2)
#Execution ends when successful
if self.is_success(target):
out.write('success')
out.close()
return
#Failure remained until the end
out.write('failure')
out.close()
def output(self):
return luigi.LocalTarget('test_repeats.txt')
In addition to the task definition described above, the entire program that includes the pipeline startup process is as follows.
pytest_pipeline.py
import json
import os
import sys
from contextlib import contextmanager
from subprocess import Popen, PIPE
import luigi
root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
lastfailed = '.cache/v/cache/lastfailed'
class PytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
repeat_id = luigi.IntParameter()
def output(self):
return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))
def run(self):
cmd = ['py.test']
cmd.extend(self.pytest_args.split(' '))
os.chdir(root)
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
for line in iter(process.stdout.readline, ''):
print(line.rstrip())
out = self.output().open('w')
with open(lastfailed) as f:
out.write(f.read())
out.close()
class RepeatPytestTask(luigi.Task):
pytest_args = luigi.Parameter(default='tests')
#The number of repetitions is given as an argument from the outside
repeat = luigi.IntParameter(default=1)
def is_success(self, target):
i = target.open('r')
success = bool(not json.load(i))
i.close()
return success
def output(self):
return luigi.LocalTarget('test_repeats.txt')
def run(self):
out = self.output().open('w')
target = yield PytestTask(
pytest_args=self.pytest_args,
repeat_id=1)
if self.is_success(target):
out.write('success')
out.close()
return
for i in range(0, self.repeat - 1):
target = yield PytestTask(
pytest_args='{0} --lf'.format(self.pytest_args),
repeat_id=i + 2)
if self.is_success(target):
out.write('success')
out.close()
return
out.write('failure')
out.close()
#Pipeline startup process
if __name__ == '__main__':
argv = ['RepeatPytestTask']
if len(sys.argv) > 1:
argv.extend(sys.argv[1:])
luigi.run(argv)
By giving the above program the number of repetitions (--repeat
) and executing it, it is possible to realize a test pipeline that automatically re-executes when it fails.
Job pipeline execution
#Luigi is output(self)Is output=Consider the task to be completed.
#If you want to execute the task from the beginning, delete all the outputs.
$ rm -rf test_repeat_1.txt test_repeats.txt test_repeat_2.txt
#For large-scale processing, a task scheduler can be built separately.
#This time it's a small process, so schedule it locally(--local-scheduler option)
# http://luigi.readthedocs.org/en/stable/central_scheduler.html?highlight=scheduler%20server
$ python pytest_pipeline.py --local-scheduler --repeat 3
Recommended Posts