Controlling test reruns with Luigi + pytest

This is the article on the 25th day of Python Advent Calendar 2015 of Adventar.


In this article, I will describe the test re-execution of Pytest as a simple example of building a job pipeline using Luigi.

What is Luigi

Luigi is a job pipeline construction tool made by Python. By using Luigi, the following things necessary for building a job pipeline can be expressed in Python code.

--Task execution --Dependencies between tasks --Save task execution results

It seems that the main target is to build a job pipeline with tasks that take a certain amount of time, such as Hadoop and Spark job execution, data loading from the database to /, etc., and a module for linking with those tools. Is supported by default (

It is an undeniable fact that it is a chef's knife in the example taken this time, but I felt the merit of being able to expand the pipeline within a fixed framework, so I decided to use it with a sense of getting used to it.

Task definition

The basics of Luigi's task definition are as follows.

--Define a class that inherits luigi.Task. --Define the following method in the class that inherits luigi.Task. --run (self): Task execution process --requires (self): Task dependencies --output (self): Save processing of task execution results --If the task requires arguments, have luigi.Parameter () or luigi. <Type: ex. Int> Parameter () in the class variable.

The following is the task definition for executing pytest.

Task of running pytest

root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))

class PytestTask(luigi.Task):

    #Task arguments
    pytest_args = luigi.Parameter(default='tests')
    repeat_id = luigi.IntParameter()

    #Task execution process
    def run(self):
        cmd = ['py.test']
        cmd.extend(self.pytest_args.split(' '))

        process = Popen(cmd, stdout=PIPE, stderr=PIPE)
        for line in iter(process.stdout.readline, ''):

        # self.output()You can get the stream to write the execution result from.
        out = self.output().open('w')
        with open(lastfailed) as f:

    #Task dependencies
    #Returns a list of dependent tasks.(ex. return [A(), B()])
    #This time, due to various reasons, an empty list(
    def requires(self):
        return []

    #Save processing of task execution results
    # luigi.Returns a class derived from Target. In the following example, the execution result is saved in the local file system.
    # (ex)
    def output(self):
        return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))

Dynamic definition of task dependencies

This time, I wanted to build a pipeline that not only runs pytest from Luigi, but also meets the following requirements and automatically reruns the test.

--If all the tests are not successful, re-execute. The upper limit of the number of re-executions is given as an argument. --When rerunning a test, use the --lf option to run only the failed test. ([Reference]( E3% 81% A7% E5% A4% B1% E6% 95% 97% E3% 81% 97% E3% 81% 9F% E3% 83% 86% E3% 82% B9% E3% 83% 88% E3% 81% AE% E3% 81% BF% E5% 86% 8D% E5% AE% 9F% E8% A1% 8C% E3% 81% 99% E3% 82% 8B))

Luigi can not only add static dependencies by requires (self) mentioned above, but also dynamically add task dependencies depending on conditions.

Task to rerun if the antecedent did not succeed

#A file that records tests that failed during the last run
lastfailed = '.cache/v/cache/lastfailed'

class RepeatPytestTask(luigi.Task):

    pytest_args = luigi.Parameter(default='tests')
    repeat = luigi.IntParameter(default=1)

    def is_success(self, target):
        i ='r')
        #If all are successful, an empty dictionary will be generated
        success = bool(not json.load(i))
        return success

    def run(self):
       #Run once and finish if successful
       out = self.output().open('w')
       target = yield PytestTask(
       if self.is_success(target):

       #Execute with lf option from the second time onwards
       for i in range(0, self.repeat - 1):
           # yield <Instance of task>Can add dynamic dependencies with
           target = yield PytestTask(
                   pytest_args='{0} --lf'.format(self.pytest_args), 
                   repeat_id=i + 2)
           #Execution ends when successful
           if self.is_success(target):
       #Failure remained until the end

    def output(self):
        return luigi.LocalTarget('test_repeats.txt')

Pipeline execution

In addition to the task definition described above, the entire program that includes the pipeline startup process is as follows.

import json
import os
import sys
from contextlib import contextmanager
from subprocess import Popen, PIPE

import luigi

root = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
lastfailed = '.cache/v/cache/lastfailed'

class PytestTask(luigi.Task):

    pytest_args = luigi.Parameter(default='tests')
    repeat_id = luigi.IntParameter()

    def output(self):
        return luigi.LocalTarget('test_repeat_{0}.txt'.format(self.repeat_id))

    def run(self):
        cmd = ['py.test']
        cmd.extend(self.pytest_args.split(' '))

        process = Popen(cmd, stdout=PIPE, stderr=PIPE)
        for line in iter(process.stdout.readline, ''):

        out = self.output().open('w')
        with open(lastfailed) as f:

class RepeatPytestTask(luigi.Task):
    pytest_args = luigi.Parameter(default='tests')
    #The number of repetitions is given as an argument from the outside
    repeat = luigi.IntParameter(default=1)

    def is_success(self, target):
        i ='r')
        success = bool(not json.load(i))
        return success

    def output(self):
        return luigi.LocalTarget('test_repeats.txt')

    def run(self):
       out = self.output().open('w')
       target = yield PytestTask(
       if self.is_success(target):

       for i in range(0, self.repeat - 1):
           target = yield PytestTask(
                   pytest_args='{0} --lf'.format(self.pytest_args), 
                   repeat_id=i + 2)
           if self.is_success(target):

#Pipeline startup process
if __name__ == '__main__':
    argv = ['RepeatPytestTask']
    if len(sys.argv) > 1:

By giving the above program the number of repetitions (--repeat) and executing it, it is possible to realize a test pipeline that automatically re-executes when it fails.

Job pipeline execution

#Luigi is output(self)Is output=Consider the task to be completed.
#If you want to execute the task from the beginning, delete all the outputs.
$ rm -rf test_repeat_1.txt test_repeats.txt test_repeat_2.txt

#For large-scale processing, a task scheduler can be built separately.
#This time it's a small process, so schedule it locally(--local-scheduler option)
$ python --local-scheduler --repeat 3

Recommended Posts

Controlling test reruns with Luigi + pytest
Unit test flask with pytest
Test Driven Development Startup with PySide & Pytest
Strengthen with code test ⑦
Strengthen with code test ⑨
Use Mock with pytest
Strengthen with code test ③
Strengthen with code test ⑤
Strengthen with code test ④
Parameter tuning with luigi (2)
Parameter tuning with luigi
Strengthen with code test ②
Strengthen with code test ①
Strengthen with code test ⑧
Strengthen with code test ⑨
Test embedded software with Google Test
Load test Websocket with Locust