It's been a week since I started using it in actual battles, but fluentd is very convenient. Then pass the data to the Python program with the exec Output Plugin.
Here is an example of Python in the Documentation (http://docs.fluentd.org/articles/out_exec):
td-agent.conf
<match fizzbuzz>
type exec
command python /path/to/fizzbuzz.py
buffer_path /path/to/buffer_path
format tsv
keys fizzbuzz
flush_interval 5s # for debugging/checking
</match>
However, when I actually use it, I want to specify Python that I put in virtualenv, and I was wondering what happened to PYTHONPATH, so I tried to specify `` `command```.
td-agent.conf
#python specifies what you put in virtualenv
#My code myapp is setup.Write py and install the package in advance
command /path/to/myapp/bin/python −m myapp.command.xxxx arg1
Now you don't have to worry about PYTHONPATH and you don't have to write a long file path. Let's write setup.py without any hassle.
Trying to set an environment variable
td-agent.conf
#Specify environment variables(Get an error)
command HOGE=FUGA /path/to/myapp/bin/python -m myapp.command.xxx arg1
Then I got the following error. If you want to pass something, use the argument. I saw 32512 for the first time.
td-agent.log
2014-08-03 17:59:54 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-08-03 17:59:57 +0900 error_class="RuntimeError" error="command returns 32512: HOGE=FUGA /path/to/myapp/bin/python /path/to/myapp/myapp/command/xxx.py /path/to/buffer_path/.20140803.q4ffb5d85bf81f0d4.log"
The path of the buffer file is passed at the end of the argument. The code that reads and processes line by line looks like this. If you want to call each process from the test code, separate the methods. You can write beautifully by stream processing from file reading to the final processing, and with PyMongo you can pass the generator to the insert method of Collection as it is, so performance is also good.
/path/to/myapp/myapp/command/xxx.py
# coding=utf-8
import json
import logging
import logging.config
import traceback
logging.config.fileConfig('logging.conf')
logger = logging.getLogger('fluent-exec-out')
def main():
file_path = parse_args()
do_something(exclude_error_row(parse(readline(file_path))))
def parse_args():
return sys.argv[-1]
def readline(file_path):
with file(file_path) as input:
for line in input:
yield line
def parse(lines):
#When the input format is json
for line in lines:
yield json.loads(line)
def exclude_error_row(rows):
for row in rows:
#Validation and log output
if xxxxxx:
logger.warn("Invalid line found %s", row)
continue
yield row
def do_something(rows):
#Do something
if __name__ == '__main__':
logger.info('Start')
try:
main()
except:
logger.error(traceback.format_exc())
logger.info('End')
To test the main method, create a file in the same format as the buffer file passed from fluentd and pass it as an argument.
test_xxx.py
# coding=utf-8
from nose.tools import ok_, eq_
from myapp.command import xxx
original_argv = sys.argv
class TestAll(object):
def teardown(self):
sys.argv = original_argv
def test_main(self):
sys.argv.append('./tests/data/fluentd_buffer.log')
xxx.main()
#Test something
def test_readline(self):
gen = xxx.readline('./tests/data/fluentd_buffer.log'):
#Test something
It might be cool to use something like fluent-logger-python and pass the log to fluentd.
In the above code, filtering is also done, but it may be beautiful for fluentd to do it with the previous match using exec_filter Output Plugin. Either way, it's a matter of taste.
If it's about filtering, I think it's okay to combine them with one exec Output, but if you pack too many processes, you will be more worried about what to do if the processes are messed up in the middle. It would be better to separate the exec Output into separate code calls.
Recommended Posts