J'écris un démon qui fonctionne 24 heures sur 24 en Python. J'ai décidé d'ajouter une API REST pour vérifier l'état de fonctionnement et modifier dynamiquement les paramètres. Nous avons cherché un moyen de l'implémenter avec un minimum d'effort sans apporter de changements majeurs à la routine principale.
Vous avez besoin d'une sorte de serveur Web pour fournir l'API REST. Puisqu'il n'est pas ouvert sur l'extérieur, nous utiliserons le serveur Web de développement inclus avec Flask. Bien que ce soit pour le développement, il prend également en charge l'accès simultané avec des threads, ce qui lui permet de résister à une utilisation pratique.
J'ai décidé d'utiliser Flask-API, qui est facile à implémenter et possède même une interface utilisateur de test.
L'expérience suivante utilise l'exemple Flask-API. Copiez-le et enregistrez-le dans un fichier appelé example.py. http://www.flaskapi.org/#example
Pour exécuter la routine principale du démon et du serveur Web en même temps, il semble que vous deviez utiliser asyncio ou threads. Flask n'est pas censé être utilisé avec asyncio, je vais donc utiliser des threads.
Qui devrait prendre l'initiative, la routine principale ou le serveur Web? Étant donné que l'API REST est une fonction supplémentaire, il semble préférable d'utiliser le thread principal comme démon et le thread dérivé comme serveur Web. (Post-scriptum: il peut être sans fondement. Du point de vue de la gestion des démons, le Web est le principal et les threads de travail sont plus naturels pour les démons)
Créez un thread dans la routine principale et démarrez le serveur de Flask à partir de là. Affichez l'heure toutes les secondes pour vous assurer que le thread principal n'est pas bloqué.
$ diff -u example.py example_threaded.py
--- example.py 2016-12-20 16:19:19.000000000 -0800
+++ example_threaded.py 2016-12-20 16:23:43.000000000 -0800
@@ -1,6 +1,13 @@
+import logging
+import threading
+
from flask import request, url_for
from flask.ext.api import FlaskAPI, status, exceptions
+FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
+logging.basicConfig(format=FORMAT, level=logging.INFO)
+log = logging.getLogger()
+
app = FlaskAPI(__name__)
@@ -53,5 +60,14 @@
if __name__ == "__main__":
- app.run(debug=True)
+
+ log.info('start')
+ rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+ rest_service_thread.start()
+ log.info('main thread is mine!')
+ import time
+ while True:
+ print(time.ctime())
+ time.sleep(1)
+ rest_service_thread.join()
** Résultat d'exécution **
Werkzeug, le serveur Web de Flask, génère des erreurs. Puisque le thread principal est actif, l'heure est affichée, mais l'API REST ne répond pas.
2016-12-20 16:30:32,129 root MainThread start
2016-12-20 16:30:32,130 root MainThread main thread is mine!
Tue Dec 20 16:30:32 2016
2016-12-20 16:30:32,141 werkzeug reset_service * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Exception in thread reset_service:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/flask/app.py", line 843, in run
run_simple(host, port, self, **options)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/serving.py", line 692, in run_simple
reloader_type)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/_reloader.py", line 242, in run_with_reloader
signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread
Tue Dec 20 16:30:33 2016
Tue Dec 20 16:30:34 2016
Il semble que le gestionnaire de signaux doit être MainThread. Il semble que ce soit une fonction pratique en cours de développement de Flask pour surveiller les modifications et recharger les fichiers, mais comme cela n'est pas nécessaire, supprimez debug = True
et réessayez.
** Résultat d'exécution 2 **
Cette fois, ça s'est bien passé. Si vous appuyez sur l'API REST à l'adresse http: // localhost: 5000, elle répondra correctement.
2016-12-20 16:38:54,214 root MainThread start
2016-12-20 16:38:54,215 root MainThread main thread is mine!
Tue Dec 20 16:38:54 2016
2016-12-20 16:38:54,224 werkzeug reset_service * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Tue Dec 20 16:38:55 2016
2016-12-20 16:38:55,840 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:55] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:56 2016
2016-12-20 16:38:56,827 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:56] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:57 2016
Vous avez maintenant les bases pour fournir l'API REST tout en allouant le thread principal pour le démon.
Le moyen le plus simple d'envoyer simplement des commandes de l'API REST au démon est d'utiliser Queue
. La file d'attente de Python est thread-safe, donc la création d'un objet Queue global et le placement / l'obtention de plusieurs threads ne causeront pas d'incohérences. Cependant, il n'y a qu'un nombre limité de cas où c'est tout ce qui est nécessaire.
Modifions davantage example_threaded.py et expérimentons la mise à jour de la variable notes à partir du thread principal et du thread du serveur Web. Le fil principal ajoute la chaîne entrée de STDIN aux notes.
Puisque les notes sont des dictionnaires, des incohérences se produiront si elles sont accédées en même temps, et une erreur se produira surtout si des clés supplémentaires ou supprimées se produisent lors de l'accès dans une boucle for. J'ai donc essayé de le verrouiller avec un décorateur «synchronisé» pour protéger la section critique.
$ diff -u example_threaded.py example_threaded2.py
--- example_threaded.py 2016-12-20 17:17:10.000000000 -0800
+++ example_threaded2.py 2016-12-20 17:29:06.000000000 -0800
@@ -23,7 +23,38 @@
'text': notes[key]
}
+def synchronized(lock):
+ """ Synchronization decorator. """
+ def wrap(f):
+ def newFunction(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
+ return newFunction
+ return wrap
+
+glock = threading.Lock()
+
+@synchronized(glock)
+def list_notes():
+ return [note_repr(idx) for idx in sorted(notes.keys())]
+
+@synchronized(glock)
+def add_note(note):
+ idx = max(notes.keys()) + 1
+ notes[idx] = note
+ return idx
+@synchronized(glock)
+def update_note(key, note):
+ notes[key] = note
+
+@synchronized(glock)
+def delete_note(key):
+ return notes.pop(key, None)
+
@app.route("/", methods=['GET', 'POST'])
def notes_list():
"""
@@ -31,12 +62,11 @@
"""
if request.method == 'POST':
note = str(request.data.get('text', ''))
- idx = max(notes.keys()) + 1
- notes[idx] = note
+ idx = add_note(note)
return note_repr(idx), status.HTTP_201_CREATED
# request.method == 'GET'
- return [note_repr(idx) for idx in sorted(notes.keys())]
+ return list_notes()
@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
@@ -46,11 +76,11 @@
"""
if request.method == 'PUT':
note = str(request.data.get('text', ''))
- notes[key] = note
+ update_note(key, note)
return note_repr(key)
elif request.method == 'DELETE':
- notes.pop(key, None)
+ delete_note(key)
return '', status.HTTP_204_NO_CONTENT
# request.method == 'GET'
@@ -60,14 +90,15 @@
if __name__ == "__main__":
-
+
log.info('start')
- rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+ rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict())
rest_service_thread.start()
log.info('main thread is mine!')
- import time
- while True:
- print(time.ctime())
- time.sleep(1)
+ import sys
+ for line in iter(sys.stdin):
+ if not line:
+ break
+ add_note(line.strip())
rest_service_thread.join()
** Résultat d'exécution **
Après le démarrage, entrez une chaîne de caractères appropriée à partir du terminal, et lorsque vous obtenez la liste de l'API REST, vous pouvez confirmer que la chaîne de caractères saisie est ajoutée.
Les processus d'arrière-plan sont vaguement appelés démons, mais les démons qui se comportent bien ont beaucoup à faire.
--Définir umask --fork et quittez le processus parent. L'enfant est adopté par init.
Pour Python, daemonize et python-daemon ) Peut être fait facilement. Utilisons python-daemon avec les paramètres minimum. Je ne peux pas utiliser stdin, j'ai donc essayé d'ajouter une note toutes les 3 secondes à la place. Si vous accédez à l'API REST, vous pouvez voir que les notes augmentent toutes les 3 secondes.
--- example_threaded2.py 2016-12-25 16:25:08.000000000 -0800
+++ example_daemon.py 2016-12-25 16:29:29.000000000 -0800
@@ -1,8 +1,9 @@
import logging
import threading
+import daemon
from flask import request, url_for
-from flask.ext.api import FlaskAPI, status, exceptions
+from flask_api import FlaskAPI, status, exceptions
FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
@@ -89,16 +90,16 @@
return note_repr(key)
-if __name__ == "__main__":
-
+def main():
log.info('start')
rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
rest_service_thread.start()
log.info('main thread is mine!')
- import sys
- for line in iter(sys.stdin):
- if not line:
- break
- add_note(line.strip())
+ import time
+ for i in range(10):
+ add_note("note{}".format(i))
+ time.sleep(3)
rest_service_thread.join()
+with daemon.DaemonContext():
+ main()
Courir et voir
$ python example_daemon.py
$
Même si vous le démarrez sans &, il reviendra au shell en un instant. Le programme se montre. Vérifions avec ps. La commande ps est pour Linux.
$ ps xao pid,ppid,pgid,sid,comm | grep python
2860 1 2859 2859 python
L'ID de processus parent est 1 et il est adopté. L'ID de session et l'ID de groupe de processus correspondent. À propos, l'ID de session de bash dans le shell de connexion était 2693, il s'agit donc d'une nouvelle session comme prévu. Il semble que le fork soit fait deux fois, et pid et pgid sont différents. La raison de faire un fork deux fois est la suivante (de Advanced Programming in Unix Chapter 13)
Under System V–based systems, some people recommend calling fork again at this point, terminating the parent, and continuing the daemon in the child. This guarantees that the daemon is not a session leader, which prevents it from acquiring a controlling terminal under the System V rules (Section 9.6). Alternatively, to avoid acquiring a controlling terminal, be sure to specify O_NOCTTY whenever opening a terminal device.
Jetez un œil au descripteur de fichier avec lsof -p 2860.
lsof -p 2860
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
python 2860 root cwd DIR 254,1 4096 2 /
python 2860 root rtd DIR 254,1 4096 2 /
python 2860 root txt REG 254,1 3781768 264936 /usr/bin/python2.7
python 2860 root mem REG 254,1 47712 1045078 /lib/x86_64-linux-gnu/libnss_files-2.19.so
python 2860 root mem REG 254,1 54248 391882 /usr/lib/python2.7/lib-dynload/_json.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 18904 1044589 /lib/x86_64-linux-gnu/libuuid.so.1.3.0
python 2860 root mem REG 254,1 31048 265571 /usr/lib/x86_64-linux-gnu/libffi.so.6.0.2
python 2860 root mem REG 254,1 141184 392622 /usr/lib/python2.7/lib-dynload/_ctypes.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 10464 796274 /usr/lib/python2.7/dist-packages/markupsafe/_speedups.so
python 2860 root mem REG 254,1 29464 392892 /usr/lib/python2.7/lib-dynload/_hashlib.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 2066816 264782 /usr/lib/x86_64-linux-gnu/libcrypto.so.1.0.0
python 2860 root mem REG 254,1 395176 264784 /usr/lib/x86_64-linux-gnu/libssl.so.1.0.0
python 2860 root mem REG 254,1 97872 392612 /usr/lib/python2.7/lib-dynload/_ssl.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 11248 392000 /usr/lib/python2.7/lib-dynload/resource.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 1607712 269275 /usr/lib/locale/locale-archive
python 2860 root mem REG 254,1 1738176 1045067 /lib/x86_64-linux-gnu/libc-2.19.so
python 2860 root mem REG 254,1 1051056 1045072 /lib/x86_64-linux-gnu/libm-2.19.so
python 2860 root mem REG 254,1 109144 1044580 /lib/x86_64-linux-gnu/libz.so.1.2.8
python 2860 root mem REG 254,1 10680 1045291 /lib/x86_64-linux-gnu/libutil-2.19.so
python 2860 root mem REG 254,1 14664 1045071 /lib/x86_64-linux-gnu/libdl-2.19.so
python 2860 root mem REG 254,1 137440 1044987 /lib/x86_64-linux-gnu/libpthread-2.19.so
python 2860 root mem REG 254,1 140928 1044988 /lib/x86_64-linux-gnu/ld-2.19.so
python 2860 root 0u CHR 1,3 0t0 5593 /dev/null
python 2860 root 1u CHR 1,3 0t0 5593 /dev/null
python 2860 root 2u CHR 1,3 0t0 5593 /dev/null
python 2860 root 3u IPv4 3596677 0t0 TCP localhost:5000 (LISTEN)
Oh! STDIN (0), STDOUT (1), STDERR (2) sont merveilleusement / dev / null. Et le répertoire de travail actuel (cwd) est /. Le répertoire racine (rtd) est également /, mais cela peut être changé en passant le paramètre à chroot. C'est un démon vraiment orthodoxe! Cela fonctionne bien même si je me déconnecte ou que je le démarre à partir du script rc.
Nous sommes toujours en train d'expérimenter diverses choses, alors faites-nous savoir s'il existe une méthode plus légère ou une méthode qui peut être réalisée sans verrou global.
Le code est Python3, mais avec quelques lignes de modification, il devrait également fonctionner en Python2. threaded = True sert à accepter plusieurs demandes en même temps et n'a rien à voir avec le fil de discussion de cet article.
import logging
import threading
import daemon
from flask import request, url_for
from flask_api import FlaskAPI, status, exceptions
FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
log = logging.getLogger()
app = FlaskAPI(__name__)
notes = {
0: 'do the shopping',
1: 'build the codez',
2: 'paint the door',
}
def note_repr(key):
return {
'url': request.host_url.rstrip('/') + url_for('notes_detail', key=key),
'text': notes[key]
}
def synchronized(lock):
""" Synchronization decorator. """
def wrap(f):
def newFunction(*args, **kw):
lock.acquire()
try:
return f(*args, **kw)
finally:
lock.release()
return newFunction
return wrap
glock = threading.Lock()
@synchronized(glock)
def list_notes():
return [note_repr(idx) for idx in sorted(notes.keys())]
@synchronized(glock)
def add_note(note):
idx = max(notes.keys()) + 1
notes[idx] = note
return idx
@synchronized(glock)
def update_note(key, note):
notes[key] = note
@synchronized(glock)
def delete_note(key):
return notes.pop(key, None)
@app.route("/", methods=['GET', 'POST'])
def notes_list():
"""
List or create notes.
"""
if request.method == 'POST':
note = str(request.data.get('text', ''))
idx = add_note(note)
return note_repr(idx), status.HTTP_201_CREATED
# request.method == 'GET'
return list_notes()
@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
def notes_detail(key):
"""
Retrieve, update or delete note instances.
"""
if request.method == 'PUT':
note = str(request.data.get('text', ''))
update_note(key, note)
return note_repr(key)
elif request.method == 'DELETE':
delete_note(key)
return '', status.HTTP_204_NO_CONTENT
# request.method == 'GET'
if key not in notes:
raise exceptions.NotFound()
return note_repr(key)
def main():
log.info('start')
rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
rest_service_thread.start()
log.info('main thread is mine!')
import time
for i in range(10):
add_note("note{}".format(i))
time.sleep(3)
rest_service_thread.join()
with daemon.DaemonContext():
main()
Recommended Posts