Dieser Artikel beschreibt eine Beispielanwendung, die Asakusa verwendet, um JSON Lines-Formatprotokolle einzugeben und zu aggregieren, die nacheinander von Fluentd gesendet werden. Es hat die gleichen Spezifikationen wie die Asakusa-Anwendung, die im vorherigen Artikel Verknüpfen von Asakusa auf M3BP und DB vorgestellt wurde, aber beim letzten Mal wurden Verkaufsdetails aus der Datenbank verknüpft. Geben Sie die Protokollausgabe von Fluentd ein. Das Datenflussdiagramm ist wie folgt.
Wir haben den Betrieb in der folgenden Umgebung bestätigt.
Der Quellcode ist auf GitHub zu finden.
Stellen Sie zwei Container, fließend und m3bp, auf Docker.
Das Verzeichnis . / Directio
auf dem Host-Betriebssystem ist in zwei Containern bereitgestellt.
Eingabedaten und Batch-Ergebnisdateien werden im Verzeichnis . / Directio
abgelegt.
Der Quellcode von Asakusa wird in . / Workspace
und Asakusa auf M3BP in den m3bp-Container gestellt. Erstellen und bereitstellen für.
docker-compose.yml
version: '3'
services:
fluentd:
build: ./fluentd
ports:
- "24224:24224"
volumes:
- ./fluentd/etc:/fluentd/etc
- ./directio:/directio
environment:
- FLUENTD_CONF=fluent.conf
m3bp:
build: .
volumes:
- ./workspace:/workspace
- ./directio:/directio
Die Docker-Datei für den m3bp-Container sieht folgendermaßen aus: Ich habe die Pakete installiert, die zum Erstellen von Asakusa auf M3BP benötigt werden.
Dockerfile
FROM centos:centos7
ENV ASAKUSA_HOME /workspace/asakusa
ENV JAVA_HOME /usr/lib/jvm/java
ENV PATH ${PATH}:${JAVA_HOME}/bin:${ASAKUSA_HOME}/bin
ADD ./workspace /workspace
ADD ./directio /directio
WORKDIR /workspace
RUN yum install -y cmake make gcc-c++ hwloc java-1.8.0-openjdk java-1.8.0-openjdk-devel
CMD ["bash", "/workspace/build.sh"]
Starten Sie den Docker-Container mit dem folgenden Befehl:
docker-compose up -d
Das Verkaufsabrechnungsprotokoll im JSON Lines-Format wird alle 30 Sekunden in das Verzeichnis . / Directio / sales
und darunter eingegeben und ausgegeben.
In diesem Beispiel generiert das Dummy-Plug-In eine feste Verkaufsabrechnung.
fluentd/etc/fluent.conf
<source>
@type dummy
tag sales.log
dummy {"sales_date_time":"2019-04-01 10:30:00","item_code":"4922010001000","amount":2,"unit_selling_price":120,"selling_price":240}
</source>
<match sales.**>
@type file
path /directio/sales/sales
time_slice_format %Y-%m-%d-%H%M
<format>
@type json
</format>
<buffer>
@type file
path /fluentd/buffer/
flush_mode interval
flush_interval 30s
timekey 5m
</buffer>
</match>
Wenn Sie nach dem Start des fließenden Containers eine Weile warten, wird alle 30 Sekunden ein Protokoll mit dem folgenden Dateinamen ausgegeben. Das Datum und die Uhrzeit des Dateinamens ändern sich alle 5 Minuten.
ll directio/sales/
total 32
-rw-r--r-- 1 suga staff 4158 4 24 18:36 sales.2019-04-24-1835_0.log
-rw-r--r-- 1 suga staff 4032 4 24 18:37 sales.2019-04-24-1835_1.log
-rw-r--r-- 1 suga staff 3780 4 24 18:37 sales.2019-04-24-1835_2.log
Direct I / O JSON * Definieren Sie mithilfe der Funktion Verkaufsdetails im JSON Lines-Format Ich bin. (* Dies ist eine Funktion, die ab 0.10.3 hinzugefügt wurde.)
workspace/example-m3bp/src/main/dmdl/models.dmdl
"Verkaufsdetails"
@directio.json(
format = jsonl,
datetime_format = "yyyy-MM-dd HH:mm:ss"
)
sales_detail = {
sales_date_time : DATETIME;
item_code : TEXT;
amount : INT;
unit_selling_price : INT;
selling_price : INT;
};
Das Muster des einzugebenden Dateinamens ist in getResourcePattaern definiert. $ {date} wird durch den Wert des Batch-Arguments
date` ersetzt.
workspace/example-m3bp/src/main/java/com/example/jobflow/SalesDetailFromJson.java
package com.example.jobflow;
import com.example.modelgen.dmdl.json.AbstractSalesDetailJsonInputDescription;
/**
*Direkt I Verkaufsabrechnung/Geben Sie mit O ein.
*Die Eingabedatei ist{@code sales}Alle oben genannten Dateien.
*/
public class SalesDetailFromJson extends AbstractSalesDetailJsonInputDescription {
@Override
public String getBasePath() {
return "sales";
}
@Override
public String getResourcePattern() {
return "**/sales.${date}*.log";
}
@Override
public DataSize getDataSize() {
return DataSize.LARGE;
}
}
Der in der Docker-Datei festgelegte Befehl / workspace / build.sh
wird mit dem folgenden Befehl ausgeführt, um die Asakusa-Batch-Anwendung im m3bp-Container zu erstellen und in ASAKUSA_HOME
bereitzustellen.
docker-compose run m3bp
workspace/build.sh
#!/bin/bash
PRJ_HOME=/workspace/example-m3bp
cd ${PRJ_HOME}
./gradlew -g /workspace/lib assemble
rm -rf $ASAKUSA_HOME
mkdir $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xvf ${PRJ_HOME}/build/asakusafw-example-m3bp.tar.gz
java -jar tools/bin/setup.jar
Datum und Uhrzeit werden angegeben, um die JSON-Datei anzugeben, die im Batch-Argument aggregiert werden soll. In den folgenden Fällen wird eine JSON-Datei mit dem Namen "2019-04-24" eingegeben und der Gesamtumsatz nach Kategorie für das angegebene Datum berechnet.
docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24
Wenn Sie die Summe alle 5 Minuten berechnen möchten, gehen Sie wie folgt vor.
docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24-1835
Die Ergebnisdatei wird in das folgende Verzeichnis auf dem Host-Betriebssystem ausgegeben.
cat directio/result/category/result.csv
Kategoriecode,Verkaufsmenge,Gesamtumsatz
1600,850,102000
In diesem Artikel haben wir ein einfaches Beispiel für die Eingabe von Dateien im JSON Lines-Format gesehen. Ich denke, dass es auf verschiedene Arten angewendet werden kann, wie zum Beispiel das Sammeln von IoT-Sensordaten mit fluentd und das Verarbeiten auf M3BP oder Hadoop, also hoffe ich, dass es hilfreich sein wird.