Fließen Sie Fluentd-Protokolle mit Asakusa zusammen

Einführung

Dieser Artikel beschreibt eine Beispielanwendung, die Asakusa verwendet, um JSON Lines-Formatprotokolle einzugeben und zu aggregieren, die nacheinander von Fluentd gesendet werden. Es hat die gleichen Spezifikationen wie die Asakusa-Anwendung, die im vorherigen Artikel Verknüpfen von Asakusa auf M3BP und DB vorgestellt wurde, aber beim letzten Mal wurden Verkaufsdetails aus der Datenbank verknüpft. Geben Sie die Protokollausgabe von Fluentd ein. Das Datenflussdiagramm ist wie folgt. fluentd_m3bp_dfd.png

Betriebsumgebung

Wir haben den Betrieb in der folgenden Umgebung bestätigt.

Der Quellcode ist auf GitHub zu finden.

Docker-Container-Einstellungen

Stellen Sie zwei Container, fließend und m3bp, auf Docker. Das Verzeichnis . / Directio auf dem Host-Betriebssystem ist in zwei Containern bereitgestellt. Eingabedaten und Batch-Ergebnisdateien werden im Verzeichnis . / Directio abgelegt. Der Quellcode von Asakusa wird in . / Workspace und Asakusa auf M3BP in den m3bp-Container gestellt. Erstellen und bereitstellen für.

docker.png

docker-compose.yml


version: '3'
services:
  fluentd:
    build: ./fluentd
    ports:
      - "24224:24224"
    volumes:
      - ./fluentd/etc:/fluentd/etc
      - ./directio:/directio
    environment:
      - FLUENTD_CONF=fluent.conf
  m3bp:
    build: .
    volumes:
      - ./workspace:/workspace
      - ./directio:/directio

Die Docker-Datei für den m3bp-Container sieht folgendermaßen aus: Ich habe die Pakete installiert, die zum Erstellen von Asakusa auf M3BP benötigt werden.

Dockerfile


FROM centos:centos7

ENV ASAKUSA_HOME /workspace/asakusa
ENV JAVA_HOME /usr/lib/jvm/java
ENV PATH ${PATH}:${JAVA_HOME}/bin:${ASAKUSA_HOME}/bin
ADD ./workspace /workspace
ADD ./directio /directio
WORKDIR /workspace

RUN yum install -y cmake make gcc-c++ hwloc java-1.8.0-openjdk java-1.8.0-openjdk-devel

CMD ["bash", "/workspace/build.sh"]

Starten Sie den Docker-Container mit dem folgenden Befehl:

docker-compose up -d

Fließende Einstellungen

Das Verkaufsabrechnungsprotokoll im JSON Lines-Format wird alle 30 Sekunden in das Verzeichnis . / Directio / sales und darunter eingegeben und ausgegeben. In diesem Beispiel generiert das Dummy-Plug-In eine feste Verkaufsabrechnung.

fluentd/etc/fluent.conf


<source>
  @type dummy
  tag sales.log
  dummy {"sales_date_time":"2019-04-01 10:30:00","item_code":"4922010001000","amount":2,"unit_selling_price":120,"selling_price":240}
</source>

<match sales.**>
  @type file
  path /directio/sales/sales
  time_slice_format %Y-%m-%d-%H%M
  <format>
    @type json
  </format>
  <buffer>
    @type file
    path /fluentd/buffer/
    flush_mode interval
    flush_interval 30s
    timekey 5m
  </buffer>
</match>

Wenn Sie nach dem Start des fließenden Containers eine Weile warten, wird alle 30 Sekunden ein Protokoll mit dem folgenden Dateinamen ausgegeben. Das Datum und die Uhrzeit des Dateinamens ändern sich alle 5 Minuten.

ll directio/sales/
total 32
-rw-r--r--  1 suga  staff  4158  4 24 18:36 sales.2019-04-24-1835_0.log
-rw-r--r--  1 suga  staff  4032  4 24 18:37 sales.2019-04-24-1835_1.log
-rw-r--r--  1 suga  staff  3780  4 24 18:37 sales.2019-04-24-1835_2.log

DMDL-Einstellungen

Direct I / O JSON * Definieren Sie mithilfe der Funktion Verkaufsdetails im JSON Lines-Format Ich bin. (* Dies ist eine Funktion, die ab 0.10.3 hinzugefügt wurde.)

workspace/example-m3bp/src/main/dmdl/models.dmdl


"Verkaufsdetails"
@directio.json(
    format = jsonl,
    datetime_format = "yyyy-MM-dd HH:mm:ss"
)
sales_detail = {
    sales_date_time : DATETIME;
    item_code : TEXT;
    amount : INT;
    unit_selling_price : INT;
    selling_price : INT;
};

Festlegen von Informationen zur Definition der Dateieingabe

Das Muster des einzugebenden Dateinamens ist in getResourcePattaern definiert. $ {date} wird durch den Wert des Batch-Arguments date` ersetzt.

workspace/example-m3bp/src/main/java/com/example/jobflow/SalesDetailFromJson.java


package com.example.jobflow;

import com.example.modelgen.dmdl.json.AbstractSalesDetailJsonInputDescription;

/**
 *Direkt I Verkaufsabrechnung/Geben Sie mit O ein.
 *Die Eingabedatei ist{@code sales}Alle oben genannten Dateien.
 */
public class SalesDetailFromJson extends AbstractSalesDetailJsonInputDescription {

    @Override
    public String getBasePath() {
        return "sales";
    }

    @Override
    public String getResourcePattern() {
        return "**/sales.${date}*.log";
    }

    @Override
    public DataSize getDataSize() {
        return DataSize.LARGE;
    }
}

Erstellen und Bereitstellen von Asakusa-Batchanwendungen

Der in der Docker-Datei festgelegte Befehl / workspace / build.sh wird mit dem folgenden Befehl ausgeführt, um die Asakusa-Batch-Anwendung im m3bp-Container zu erstellen und in ASAKUSA_HOME bereitzustellen.

docker-compose run m3bp

workspace/build.sh


#!/bin/bash
PRJ_HOME=/workspace/example-m3bp

cd ${PRJ_HOME}
./gradlew -g /workspace/lib assemble
rm -rf $ASAKUSA_HOME
mkdir $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xvf ${PRJ_HOME}/build/asakusafw-example-m3bp.tar.gz
java -jar tools/bin/setup.jar

Stapelausführung

Datum und Uhrzeit werden angegeben, um die JSON-Datei anzugeben, die im Batch-Argument aggregiert werden soll. In den folgenden Fällen wird eine JSON-Datei mit dem Namen "2019-04-24" eingegeben und der Gesamtumsatz nach Kategorie für das angegebene Datum berechnet.

docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24

Wenn Sie die Summe alle 5 Minuten berechnen möchten, gehen Sie wie folgt vor.

docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24-1835

Überprüfen Sie die Ergebnisdatei

Die Ergebnisdatei wird in das folgende Verzeichnis auf dem Host-Betriebssystem ausgegeben.

cat directio/result/category/result.csv 
Kategoriecode,Verkaufsmenge,Gesamtumsatz
1600,850,102000

Schließlich

In diesem Artikel haben wir ein einfaches Beispiel für die Eingabe von Dateien im JSON Lines-Format gesehen. Ich denke, dass es auf verschiedene Arten angewendet werden kann, wie zum Beispiel das Sammeln von IoT-Sensordaten mit fluentd und das Verarbeiten auf M3BP oder Hadoop, also hoffe ich, dass es hilfreich sein wird.

Recommended Posts

Fließen Sie Fluentd-Protokolle mit Asakusa zusammen
Hallo Welt, mit Asakusa Framework!