Aggregate Fluentd logs with Asakusa

Introduction

In this article, I will explain a sample application that inputs and aggregates JSON Lines format logs sent sequentially from Fluentd with Asakusa. It has the same specifications as the Asakusa application introduced in the previous article Linking Asakusa on M3BP and DB, but last time it linked sales details from Database. Enter the log output from Fluentd. The data flow diagram is as follows. fluentd_m3bp_dfd.png

Operating environment

We have confirmed the operation in the following environment.

The source code is on GitHub.

Docker container settings

Set two containers, fluentd and m3bp, on docker. The directory ./directio on the Host OS is mounted in two containers. Input data and batch result files are placed under the ./directio directory. The source code of Asakusa is placed in ./workspace, and Asakusa on M3BP on the m3bp container. Build and deploy for.

docker.png

docker-compose.yml


version: '3'
services:
  fluentd:
    build: ./fluentd
    ports:
      - "24224:24224"
    volumes:
      - ./fluentd/etc:/fluentd/etc
      - ./directio:/directio
    environment:
      - FLUENTD_CONF=fluent.conf
  m3bp:
    build: .
    volumes:
      - ./workspace:/workspace
      - ./directio:/directio

The Dockerfile for the m3bp container looks like this: I have installed the packages needed to build Asakusa on M3BP.

Dockerfile


FROM centos:centos7

ENV ASAKUSA_HOME /workspace/asakusa
ENV JAVA_HOME /usr/lib/jvm/java
ENV PATH ${PATH}:${JAVA_HOME}/bin:${ASAKUSA_HOME}/bin
ADD ./workspace /workspace
ADD ./directio /directio
WORKDIR /workspace

RUN yum install -y cmake make gcc-c++ hwloc java-1.8.0-openjdk java-1.8.0-openjdk-devel

CMD ["bash", "/workspace/build.sh"]

Start the Docker container with the following command:

docker-compose up -d

Fluentd settings

The sales statement log in JSON Lines format is input and output to the ./directio/sales directory and below every 30 seconds. In this sample, a fixed sales statement is generated by the dummy plugin.

fluentd/etc/fluent.conf


<source>
  @type dummy
  tag sales.log
  dummy {"sales_date_time":"2019-04-01 10:30:00","item_code":"4922010001000","amount":2,"unit_selling_price":120,"selling_price":240}
</source>

<match sales.**>
  @type file
  path /directio/sales/sales
  time_slice_format %Y-%m-%d-%H%M
  <format>
    @type json
  </format>
  <buffer>
    @type file
    path /fluentd/buffer/
    flush_mode interval
    flush_interval 30s
    timekey 5m
  </buffer>
</match>

If you wait for a while after the fluentd container is started, the log will be output every 30 seconds with the following file name. The date and time of the file name changes every 5 minutes.

ll directio/sales/
total 32
-rw-r--r--  1 suga  staff  4158  4 24 18:36 sales.2019-04-24-1835_0.log
-rw-r--r--  1 suga  staff  4032  4 24 18:37 sales.2019-04-24-1835_1.log
-rw-r--r--  1 suga  staff  3780  4 24 18:37 sales.2019-04-24-1835_2.log

DMDL settings

Direct I / O JSON * Define a sales statement in JSON Lines format using the function of * I am. (* This is a function added from 0.10.3)

workspace/example-m3bp/src/main/dmdl/models.dmdl


"Sales details"
@directio.json(
    format = jsonl,
    datetime_format = "yyyy-MM-dd HH:mm:ss"
)
sales_detail = {
    sales_date_time : DATETIME;
    item_code : TEXT;
    amount : INT;
    unit_selling_price : INT;
    selling_price : INT;
};

File input definition information setting

The file name pattern to be entered is defined in getResourcePattaern. $ {date} is replaced with the value of the batch argument date.

workspace/example-m3bp/src/main/java/com/example/jobflow/SalesDetailFromJson.java


package com.example.jobflow;

import com.example.modelgen.dmdl.json.AbstractSalesDetailJsonInputDescription;

/**
 *Direct I sales statement/Enter with O.
 *The input file is{@code sales}All the files above.
 */
public class SalesDetailFromJson extends AbstractSalesDetailJsonInputDescription {

    @Override
    public String getBasePath() {
        return "sales";
    }

    @Override
    public String getResourcePattern() {
        return "**/sales.${date}*.log";
    }

    @Override
    public DataSize getDataSize() {
        return DataSize.LARGE;
    }
}

Build and deploy Asakusa batch applications

The /workspace/build.sh command set in the Dockerfile is executed with the following command to build the Asakusa batch application in the m3bp container and deploy it to ʻASAKUSA_HOME`.

docker-compose run m3bp

workspace/build.sh


#!/bin/bash
PRJ_HOME=/workspace/example-m3bp

cd ${PRJ_HOME}
./gradlew -g /workspace/lib assemble
rm -rf $ASAKUSA_HOME
mkdir $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xvf ${PRJ_HOME}/build/asakusafw-example-m3bp.tar.gz
java -jar tools/bin/setup.jar

Batch execution

The date and time are specified to specify the JSON file to be aggregated in the batch argument. In the following cases, a JSON file containing 2019-04-24 will be entered and the total sales by category for the specified date will be calculated.

docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24

If you want to calculate the total every 5 minutes, execute as follows.

docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24-1835

Check the result file

The result file is output to the following directory on the host OS.

cat directio/result/category/result.csv 
Category code,Sales quantity,Total sales
1600,850,102000

Finally

In this article, we have seen a simple sample for entering a JSON Lines format file. I think that it can be applied in various ways such as collecting IoT sensor data with fluentd and processing it on M3BP or Hadoop, so I hope it will be helpful.

Recommended Posts

Aggregate Fluentd logs with Asakusa
Hello, World! With Asakusa Framework!