Django management command duplicate startup prevention decorator
decorators.py
def find_process_ids(pattern):
"""
Rechercher et renvoyer l'ID de processus qui correspond au modèle(Autre que le vôtre)
:rtype: list
"""
self_pid = os.getpid()
try:
out = subprocess.check_output(['pgrep', '-f', pattern])
return list(filter(
lambda x: x != self_pid, map(int, out.splitlines())))
except subprocess.CalledProcessError:
return []
def runs_once(batch_name):
"""
Prévention de l'activation dupliquée des commandes de gestion
@runs_once(__file__)
def handle(...):
...
"""
re_batch_name = re.compile(r'^/.*/(\w+)\.py$')
m = re_batch_name.match(batch_name)
if m:
batch_name = m.group(1)
def _inner(func):
@wraps(func)
def decorate(*args, **kwargs):
process_ids = find_process_ids(batch_name)
if process_ids:
print('process already exists. {}, {}'.format(
batch_name, process_ids)
)
return
else:
return func(*args, **kwargs)
return decorate
return _inner