I'm writing a daemon that runs 24 hours in Python. I decided to add a REST API to check the operating status and dynamically change the settings. We sought a way to implement it with minimal effort without making major changes to the main routine.
You need some kind of web server to provide the REST API. Since it is not open to the outside, we will use the development web server included with Flask. Although it is for development, it also supports simultaneous access with threads, so it can withstand practical use.
I decided to use Flask-API, which is easy to implement and even has a test UI.
The following experiment uses the Flask-API sample. Copy and paste and save as a file called example.py. http://www.flaskapi.org/#example
To run the main routine of the daemon and the web server at the same time, it seems that you will have to use asyncio or threads. Flask isn't supposed to be used with asyncio, so I'll use threads.
Which should take the initiative, the main routine or the web server? Considering that the REST API is an additional function, it seems better to use the daemon as the main thread and the web server as the derived thread. (Postscript: It may be unfounded. From the perspective of managing daemons, the web is the main, and worker threads are more natural for daemons.)
Create a thread in the main routine and start Flask's server from there. View the time every second to make sure the main thread is not blocked.
$ diff -u example.py example_threaded.py
--- example.py 2016-12-20 16:19:19.000000000 -0800
+++ example_threaded.py 2016-12-20 16:23:43.000000000 -0800
@@ -1,6 +1,13 @@
+import logging
+import threading
+
from flask import request, url_for
from flask.ext.api import FlaskAPI, status, exceptions
+FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
+logging.basicConfig(format=FORMAT, level=logging.INFO)
+log = logging.getLogger()
+
app = FlaskAPI(__name__)
@@ -53,5 +60,14 @@
if __name__ == "__main__":
- app.run(debug=True)
+
+ log.info('start')
+ rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+ rest_service_thread.start()
+ log.info('main thread is mine!')
+ import time
+ while True:
+ print(time.ctime())
+ time.sleep(1)
+ rest_service_thread.join()
Execution result
Werkzeug, the web server inside Flask, is throwing some errors. Since the main thread is alive, the time is displayed, but the REST API does not respond.
2016-12-20 16:30:32,129 root MainThread start
2016-12-20 16:30:32,130 root MainThread main thread is mine!
Tue Dec 20 16:30:32 2016
2016-12-20 16:30:32,141 werkzeug reset_service * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Exception in thread reset_service:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/flask/app.py", line 843, in run
run_simple(host, port, self, **options)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/serving.py", line 692, in run_simple
reloader_type)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/_reloader.py", line 242, in run_with_reloader
signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread
Tue Dec 20 16:30:33 2016
Tue Dec 20 16:30:34 2016
It seems that the signal handler must be MainThread. It seems that it is a convenient function under development of Flask to monitor file changes and reload, but since it is unnecessary, delete debug = True
and try again.
** Execution result 2 **
This time it went well. If you hit the REST API at http: // localhost: 5000, it will respond properly.
2016-12-20 16:38:54,214 root MainThread start
2016-12-20 16:38:54,215 root MainThread main thread is mine!
Tue Dec 20 16:38:54 2016
2016-12-20 16:38:54,224 werkzeug reset_service * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Tue Dec 20 16:38:55 2016
2016-12-20 16:38:55,840 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:55] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:56 2016
2016-12-20 16:38:56,827 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:56] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:57 2016
Now you have the groundwork to provide the REST API while allocating the main thread for the daemon.
The easiest way to just send commands from the REST API to the daemon is to use Queue
. Python's Queue is thread-safe, so you can create a global Queue object and put / get from multiple threads without inconsistencies. However, there are only a limited number of cases where that is all that is needed.
Let's modify example_threaded.py further and experiment with updating the notes variable from the main thread and the web server thread. The main thread adds the string entered from STDIN to notes.
Since notes are a dictionary, inconsistencies will occur if they are accessed at the same time, and an error will occur, especially if additional or deleted keys occur while accessing in a for loop. So I tried to lock it with a synchronized
decorator to protect the critical section.
$ diff -u example_threaded.py example_threaded2.py
--- example_threaded.py 2016-12-20 17:17:10.000000000 -0800
+++ example_threaded2.py 2016-12-20 17:29:06.000000000 -0800
@@ -23,7 +23,38 @@
'text': notes[key]
}
+def synchronized(lock):
+ """ Synchronization decorator. """
+ def wrap(f):
+ def newFunction(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
+ return newFunction
+ return wrap
+
+glock = threading.Lock()
+
+@synchronized(glock)
+def list_notes():
+ return [note_repr(idx) for idx in sorted(notes.keys())]
+
+@synchronized(glock)
+def add_note(note):
+ idx = max(notes.keys()) + 1
+ notes[idx] = note
+ return idx
+@synchronized(glock)
+def update_note(key, note):
+ notes[key] = note
+
+@synchronized(glock)
+def delete_note(key):
+ return notes.pop(key, None)
+
@app.route("/", methods=['GET', 'POST'])
def notes_list():
"""
@@ -31,12 +62,11 @@
"""
if request.method == 'POST':
note = str(request.data.get('text', ''))
- idx = max(notes.keys()) + 1
- notes[idx] = note
+ idx = add_note(note)
return note_repr(idx), status.HTTP_201_CREATED
# request.method == 'GET'
- return [note_repr(idx) for idx in sorted(notes.keys())]
+ return list_notes()
@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
@@ -46,11 +76,11 @@
"""
if request.method == 'PUT':
note = str(request.data.get('text', ''))
- notes[key] = note
+ update_note(key, note)
return note_repr(key)
elif request.method == 'DELETE':
- notes.pop(key, None)
+ delete_note(key)
return '', status.HTTP_204_NO_CONTENT
# request.method == 'GET'
@@ -60,14 +90,15 @@
if __name__ == "__main__":
-
+
log.info('start')
- rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+ rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict())
rest_service_thread.start()
log.info('main thread is mine!')
- import time
- while True:
- print(time.ctime())
- time.sleep(1)
+ import sys
+ for line in iter(sys.stdin):
+ if not line:
+ break
+ add_note(line.strip())
rest_service_thread.join()
Execution result
After starting, enter an appropriate character string from the terminal, and when you get the list from the REST API, you can confirm that the entered character string is added.
Background processes are loosely called daemons, but well-behaved daemons have a lot to do.
--Set umask --fork and exit the parent process. The child is adopted by init. --Make the child process a separate process group and create a new session so that it does not receive terminal signals. (It doesn't have to be a session reader. System-V recommends a non-session reader that doesn't have a control terminal. See below) --Set the current directory to / --Close unnecessary file descriptors --Set stdin, stdout, stderr to / dev / null --Set a secure PATH --Create a lock file so that it will not be started twice (PID file) --Handle or ignore the signal correctly --Leave a log by syslog etc. --chroot
For Python, daemonize and python-daemon ) Can be done easily. Let's use python-daemon with the minimum settings. I can't use stdin, so I tried to add a note every 3 seconds instead. If you access the REST API, you can see that notes are increasing every 3 seconds.
--- example_threaded2.py 2016-12-25 16:25:08.000000000 -0800
+++ example_daemon.py 2016-12-25 16:29:29.000000000 -0800
@@ -1,8 +1,9 @@
import logging
import threading
+import daemon
from flask import request, url_for
-from flask.ext.api import FlaskAPI, status, exceptions
+from flask_api import FlaskAPI, status, exceptions
FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
@@ -89,16 +90,16 @@
return note_repr(key)
-if __name__ == "__main__":
-
+def main():
log.info('start')
rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
rest_service_thread.start()
log.info('main thread is mine!')
- import sys
- for line in iter(sys.stdin):
- if not line:
- break
- add_note(line.strip())
+ import time
+ for i in range(10):
+ add_note("note{}".format(i))
+ time.sleep(3)
rest_service_thread.join()
+with daemon.DaemonContext():
+ main()
Run and see
$ python example_daemon.py
$
Even if you start it without &, it will return to the shell in an instant. The program is daemonizing itself. Let's check with ps. The ps command is for Linux.
$ ps xao pid,ppid,pgid,sid,comm | grep python
2860 1 2859 2859 python
The parent process ID is 1 and it is adopted. The session ID and process group ID match. By the way, the session ID of bash in the login shell was 2693, so it is a new session as expected. It seems that fork is done twice, and pid and pgid are different. The reason for doing fork twice is as follows (from Advanced Programming in Unix Chapter 13)
Under System V–based systems, some people recommend calling fork again at this point, terminating the parent, and continuing the daemon in the child. This guarantees that the daemon is not a session leader, which prevents it from acquiring a controlling terminal under the System V rules (Section 9.6). Alternatively, to avoid acquiring a controlling terminal, be sure to specify O_NOCTTY whenever opening a terminal device.
Take a look at the file descriptor with lsof -p 2860.
lsof -p 2860
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
python 2860 root cwd DIR 254,1 4096 2 /
python 2860 root rtd DIR 254,1 4096 2 /
python 2860 root txt REG 254,1 3781768 264936 /usr/bin/python2.7
python 2860 root mem REG 254,1 47712 1045078 /lib/x86_64-linux-gnu/libnss_files-2.19.so
python 2860 root mem REG 254,1 54248 391882 /usr/lib/python2.7/lib-dynload/_json.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 18904 1044589 /lib/x86_64-linux-gnu/libuuid.so.1.3.0
python 2860 root mem REG 254,1 31048 265571 /usr/lib/x86_64-linux-gnu/libffi.so.6.0.2
python 2860 root mem REG 254,1 141184 392622 /usr/lib/python2.7/lib-dynload/_ctypes.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 10464 796274 /usr/lib/python2.7/dist-packages/markupsafe/_speedups.so
python 2860 root mem REG 254,1 29464 392892 /usr/lib/python2.7/lib-dynload/_hashlib.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 2066816 264782 /usr/lib/x86_64-linux-gnu/libcrypto.so.1.0.0
python 2860 root mem REG 254,1 395176 264784 /usr/lib/x86_64-linux-gnu/libssl.so.1.0.0
python 2860 root mem REG 254,1 97872 392612 /usr/lib/python2.7/lib-dynload/_ssl.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 11248 392000 /usr/lib/python2.7/lib-dynload/resource.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 1607712 269275 /usr/lib/locale/locale-archive
python 2860 root mem REG 254,1 1738176 1045067 /lib/x86_64-linux-gnu/libc-2.19.so
python 2860 root mem REG 254,1 1051056 1045072 /lib/x86_64-linux-gnu/libm-2.19.so
python 2860 root mem REG 254,1 109144 1044580 /lib/x86_64-linux-gnu/libz.so.1.2.8
python 2860 root mem REG 254,1 10680 1045291 /lib/x86_64-linux-gnu/libutil-2.19.so
python 2860 root mem REG 254,1 14664 1045071 /lib/x86_64-linux-gnu/libdl-2.19.so
python 2860 root mem REG 254,1 137440 1044987 /lib/x86_64-linux-gnu/libpthread-2.19.so
python 2860 root mem REG 254,1 140928 1044988 /lib/x86_64-linux-gnu/ld-2.19.so
python 2860 root 0u CHR 1,3 0t0 5593 /dev/null
python 2860 root 1u CHR 1,3 0t0 5593 /dev/null
python 2860 root 2u CHR 1,3 0t0 5593 /dev/null
python 2860 root 3u IPv4 3596677 0t0 TCP localhost:5000 (LISTEN)
Oh! STDIN (0), STDOUT (1), STDERR (2) are nicely / dev / null. And the current working directory (cwd) is /. The root directory (rtd) is also /, but this can be changed by passing the chroot setting. It's a truly orthodox demon! It works fine even if I log out or start it from the rc script.
--Successfully run Flask's development web server in a derived thread with the main routine in the main thread. --It has been confirmed that variables can be shared / updated with the main routine and REST API. --Successfully implemented as a well-behaved daemon.
We are still experimenting with various things, so please let us know if there is a lighter method or a method that can be realized without a global lock.
The code is Python3, but with a few lines of modifications it should work in Python2 as well. threaded = True is for accepting multiple requests at the same time and has nothing to do with the thread in this paper.
import logging
import threading
import daemon
from flask import request, url_for
from flask_api import FlaskAPI, status, exceptions
FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
log = logging.getLogger()
app = FlaskAPI(__name__)
notes = {
0: 'do the shopping',
1: 'build the codez',
2: 'paint the door',
}
def note_repr(key):
return {
'url': request.host_url.rstrip('/') + url_for('notes_detail', key=key),
'text': notes[key]
}
def synchronized(lock):
""" Synchronization decorator. """
def wrap(f):
def newFunction(*args, **kw):
lock.acquire()
try:
return f(*args, **kw)
finally:
lock.release()
return newFunction
return wrap
glock = threading.Lock()
@synchronized(glock)
def list_notes():
return [note_repr(idx) for idx in sorted(notes.keys())]
@synchronized(glock)
def add_note(note):
idx = max(notes.keys()) + 1
notes[idx] = note
return idx
@synchronized(glock)
def update_note(key, note):
notes[key] = note
@synchronized(glock)
def delete_note(key):
return notes.pop(key, None)
@app.route("/", methods=['GET', 'POST'])
def notes_list():
"""
List or create notes.
"""
if request.method == 'POST':
note = str(request.data.get('text', ''))
idx = add_note(note)
return note_repr(idx), status.HTTP_201_CREATED
# request.method == 'GET'
return list_notes()
@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
def notes_detail(key):
"""
Retrieve, update or delete note instances.
"""
if request.method == 'PUT':
note = str(request.data.get('text', ''))
update_note(key, note)
return note_repr(key)
elif request.method == 'DELETE':
delete_note(key)
return '', status.HTTP_204_NO_CONTENT
# request.method == 'GET'
if key not in notes:
raise exceptions.NotFound()
return note_repr(key)
def main():
log.info('start')
rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
rest_service_thread.start()
log.info('main thread is mine!')
import time
for i in range(10):
add_note("note{}".format(i))
time.sleep(3)
rest_service_thread.join()
with daemon.DaemonContext():
main()
Recommended Posts