In this article, I will explain a sample application that inputs and aggregates JSON Lines format logs sent sequentially from Fluentd with Asakusa. It has the same specifications as the Asakusa application introduced in the previous article Linking Asakusa on M3BP and DB, but last time it linked sales details from Database. Enter the log output from Fluentd. The data flow diagram is as follows.
We have confirmed the operation in the following environment.
The source code is on GitHub.
Set two containers, fluentd and m3bp, on docker.
The directory ./directio
on the Host OS is mounted in two containers.
Input data and batch result files are placed under the ./directio
directory.
The source code of Asakusa is placed in ./workspace
, and Asakusa on M3BP on the m3bp container. Build and deploy for.
docker-compose.yml
version: '3'
services:
fluentd:
build: ./fluentd
ports:
- "24224:24224"
volumes:
- ./fluentd/etc:/fluentd/etc
- ./directio:/directio
environment:
- FLUENTD_CONF=fluent.conf
m3bp:
build: .
volumes:
- ./workspace:/workspace
- ./directio:/directio
The Dockerfile for the m3bp container looks like this: I have installed the packages needed to build Asakusa on M3BP.
Dockerfile
FROM centos:centos7
ENV ASAKUSA_HOME /workspace/asakusa
ENV JAVA_HOME /usr/lib/jvm/java
ENV PATH ${PATH}:${JAVA_HOME}/bin:${ASAKUSA_HOME}/bin
ADD ./workspace /workspace
ADD ./directio /directio
WORKDIR /workspace
RUN yum install -y cmake make gcc-c++ hwloc java-1.8.0-openjdk java-1.8.0-openjdk-devel
CMD ["bash", "/workspace/build.sh"]
Start the Docker container with the following command:
docker-compose up -d
The sales statement log in JSON Lines format is input and output to the ./directio/sales
directory and below every 30 seconds.
In this sample, a fixed sales statement is generated by the dummy plugin.
fluentd/etc/fluent.conf
<source>
@type dummy
tag sales.log
dummy {"sales_date_time":"2019-04-01 10:30:00","item_code":"4922010001000","amount":2,"unit_selling_price":120,"selling_price":240}
</source>
<match sales.**>
@type file
path /directio/sales/sales
time_slice_format %Y-%m-%d-%H%M
<format>
@type json
</format>
<buffer>
@type file
path /fluentd/buffer/
flush_mode interval
flush_interval 30s
timekey 5m
</buffer>
</match>
If you wait for a while after the fluentd container is started, the log will be output every 30 seconds with the following file name. The date and time of the file name changes every 5 minutes.
ll directio/sales/
total 32
-rw-r--r-- 1 suga staff 4158 4 24 18:36 sales.2019-04-24-1835_0.log
-rw-r--r-- 1 suga staff 4032 4 24 18:37 sales.2019-04-24-1835_1.log
-rw-r--r-- 1 suga staff 3780 4 24 18:37 sales.2019-04-24-1835_2.log
Direct I / O JSON * Define a sales statement in JSON Lines format using the function of * I am. (* This is a function added from 0.10.3)
workspace/example-m3bp/src/main/dmdl/models.dmdl
"Sales details"
@directio.json(
format = jsonl,
datetime_format = "yyyy-MM-dd HH:mm:ss"
)
sales_detail = {
sales_date_time : DATETIME;
item_code : TEXT;
amount : INT;
unit_selling_price : INT;
selling_price : INT;
};
The file name pattern to be entered is defined in getResourcePattaern
. $ {date}
is replaced with the value of the batch argument date
.
workspace/example-m3bp/src/main/java/com/example/jobflow/SalesDetailFromJson.java
package com.example.jobflow;
import com.example.modelgen.dmdl.json.AbstractSalesDetailJsonInputDescription;
/**
*Direct I sales statement/Enter with O.
*The input file is{@code sales}All the files above.
*/
public class SalesDetailFromJson extends AbstractSalesDetailJsonInputDescription {
@Override
public String getBasePath() {
return "sales";
}
@Override
public String getResourcePattern() {
return "**/sales.${date}*.log";
}
@Override
public DataSize getDataSize() {
return DataSize.LARGE;
}
}
The /workspace/build.sh
command set in the Dockerfile is executed with the following command to build the Asakusa batch application in the m3bp container and deploy it to ʻASAKUSA_HOME`.
docker-compose run m3bp
workspace/build.sh
#!/bin/bash
PRJ_HOME=/workspace/example-m3bp
cd ${PRJ_HOME}
./gradlew -g /workspace/lib assemble
rm -rf $ASAKUSA_HOME
mkdir $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xvf ${PRJ_HOME}/build/asakusafw-example-m3bp.tar.gz
java -jar tools/bin/setup.jar
The date and time are specified to specify the JSON file to be aggregated in the batch argument.
In the following cases, a JSON file containing 2019-04-24
will be entered and the total sales by category for the specified date will be calculated.
docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24
If you want to calculate the total every 5 minutes, execute as follows.
docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24-1835
The result file is output to the following directory on the host OS.
cat directio/result/category/result.csv
Category code,Sales quantity,Total sales
1600,850,102000
In this article, we have seen a simple sample for entering a JSON Lines format file. I think that it can be applied in various ways such as collecting IoT sensor data with fluentd and processing it on M3BP or Hadoop, so I hope it will be helpful.