exec_filter Output Plugin (out_ecec_filter) is basically a filter that can be filtered using any program. For example, you can drop an unnecessary record for the next process, or save it somewhere as an error. Now let's use a Python program.
I tried as follows.
<match hoge>
type exec_filter
command /path/to/python -m myapp.fluent_stream_filter
time_key time
in_format json
out_format msgpack
buffer_type file
buffer_path /path/to/buffer
buffer_chunk_limit 8m
buffer_queue_limit 64
flush_interval 10s
tag fuga
</match>
I'm running it as a module with command
so that import works well. In this case, it is not necessary to specify PYTHONPATH.
I'm using a file buffer because I need to restart fluend when I modify the code. The exec_filter Output plug-in is different from the exec Output plug-in, and the specified script waits for standard input after startup.
As requested by the exec_filter Output Plugin, the script must take standard input and output the result to standard output.
fluent_stream_filter.py
# coding=utf-8
"""
fluentd out_exec_Code called by filter
Processes line by line from standard input and outputs the result to standard output
"""
import json
import logging
import logging.config
import sys
import traceback
import msgpack
logging.config.fileConfig('/path/to/logging.conf')
logger = logging.getLogger('fluent-exec')
logger.propagate = False
def main():
stdin = sys.stdin
output(convert(validate(parse(readline(stdin)))))
def readline(stdin):
for line in stdin:
yield line
def parse(lines):
for line in lines:
yield json.loads(line)
def validate(rows):
"""Error data removal"""
for row in rows:
try:
#Some kind of inspection
except Exception, e:
logger.warn(e)
logger.warn(row)
continue
yield row
def convert(rows):
"""Some conversion process"""
for row in rows:
# do something
yield row
def output(rows):
"""Exhale to standard output"""
for row in rows:
sys.stdout.write(msgpack.packb(row))
if __name__ == '__main__':
logger.info('Start main')
try:
main()
except Exception, e:
logger.error(traceback.format_exc())
raise e
logger.info('End main')
It is customary to pass __name__
to logging.getLogger, but here the content is'__main__'
because the script is started, so the name of the logger is specified by a character string.
It may be better for methods that handle standard input to receive stdio as a parameter. I couldn't find a way to use the standard input as it is in the test, so I passed an instance of StringIO so that I could read the test data.
# coding=utf-8
import types
from StringIO import StringIO
from nose.tools import ok_, eq_
from myapp import fluent_stream_filter
class TestAll(object):
def test_readline(self):
"""readline test, sys.Pass StringIO instead of stdin"""
stdin = StringIO()
stdin.write(open('/path/to/test_data.log').read())
stdin.seek(0)
stream = fluent_stream_filter.readline(stdin)
ok_(isinstance(stream, types.GeneratorType))
eq_(stream.next(), u'{Contents of the first line}')
def test_parse(self):
"""parse test"""
stream = fluent_stream_filter.parse(iter(['{...}', '{...}']))
ok_(isinstance(stream, types.GeneratorType))
eq_(stream.next(), {...})
eq_(stream.next(), {...})
# (Abbreviation)
If you spit out a log other than the processing result to the standard output, for example, you cannot parse it as MessagePack or JSON, so an error will occur on the fluentd side. To prevent accidents such as the ConsoleHandler being set in the root logger and unintentionally spitting logs to standard output
logger.propagate = False
It is safe to do
Recommended Posts