Agréger les journaux Fluentd avec Asakusa

introduction

Cet article décrit un exemple d'application qui utilise Asakusa pour saisir et agréger les journaux au format JSON Lines qui sont envoyés séquentiellement à partir de Fluentd. Il a les mêmes spécifications que l'application Asakusa introduite dans l'article précédent Linking Asakusa on M3BP and DB, mais la dernière fois, il a lié les détails de vente de Database. Entrez la sortie du journal de Fluentd. Le diagramme de flux de données est le suivant. fluentd_m3bp_dfd.png

Environnement d'exploitation

Nous avons confirmé l'opération dans l'environnement suivant.

Le code source peut être trouvé sur GitHub.

Paramètres du conteneur Docker

Définissez deux conteneurs, fluentd et m3bp, sur docker. Le répertoire . / Directio sur le système d'exploitation hôte est monté dans deux conteneurs. Les données d'entrée et les fichiers de résultats de lots sont placés sous le répertoire . / Directio. Le code source d'Asakusa est placé dans . / Workspace, et Asakusa sur M3BP est placé sur le conteneur m3bp. Construisez et déployez pour.

docker.png

docker-compose.yml


version: '3'
services:
  fluentd:
    build: ./fluentd
    ports:
      - "24224:24224"
    volumes:
      - ./fluentd/etc:/fluentd/etc
      - ./directio:/directio
    environment:
      - FLUENTD_CONF=fluent.conf
  m3bp:
    build: .
    volumes:
      - ./workspace:/workspace
      - ./directio:/directio

Le Dockerfile pour le conteneur m3bp ressemble à ceci: J'ai installé les packages nécessaires pour construire Asakusa sur M3BP.

Dockerfile


FROM centos:centos7

ENV ASAKUSA_HOME /workspace/asakusa
ENV JAVA_HOME /usr/lib/jvm/java
ENV PATH ${PATH}:${JAVA_HOME}/bin:${ASAKUSA_HOME}/bin
ADD ./workspace /workspace
ADD ./directio /directio
WORKDIR /workspace

RUN yum install -y cmake make gcc-c++ hwloc java-1.8.0-openjdk java-1.8.0-openjdk-devel

CMD ["bash", "/workspace/build.sh"]

Démarrez le conteneur Docker avec la commande suivante:

docker-compose up -d

Paramètres Fluentd

Le journal des relevés de vente au format JSON Lines est entré et sorti dans le répertoire . / Directio / sales et en dessous toutes les 30 secondes. Dans cet exemple, le plug-in factice génère un relevé de vente fixe.

fluentd/etc/fluent.conf


<source>
  @type dummy
  tag sales.log
  dummy {"sales_date_time":"2019-04-01 10:30:00","item_code":"4922010001000","amount":2,"unit_selling_price":120,"selling_price":240}
</source>

<match sales.**>
  @type file
  path /directio/sales/sales
  time_slice_format %Y-%m-%d-%H%M
  <format>
    @type json
  </format>
  <buffer>
    @type file
    path /fluentd/buffer/
    flush_mode interval
    flush_interval 30s
    timekey 5m
  </buffer>
</match>

Si vous attendez un moment après le démarrage du conteneur fluentd, un journal sera généré toutes les 30 secondes avec le nom de fichier suivant. La date et l'heure du nom de fichier changent toutes les 5 minutes.

ll directio/sales/
total 32
-rw-r--r--  1 suga  staff  4158  4 24 18:36 sales.2019-04-24-1835_0.log
-rw-r--r--  1 suga  staff  4032  4 24 18:37 sales.2019-04-24-1835_1.log
-rw-r--r--  1 suga  staff  3780  4 24 18:37 sales.2019-04-24-1835_2.log

Paramètres DMDL

Direct I / O JSON * Définissez les détails des ventes au format JSON Lines en utilisant la fonction * Je suis. (* Ceci est une fonction ajoutée à partir de 0.10.3)

workspace/example-m3bp/src/main/dmdl/models.dmdl


"Détails des ventes"
@directio.json(
    format = jsonl,
    datetime_format = "yyyy-MM-dd HH:mm:ss"
)
sales_detail = {
    sales_date_time : DATETIME;
    item_code : TEXT;
    amount : INT;
    unit_selling_price : INT;
    selling_price : INT;
};

Définition des informations de définition d'entrée de fichier

Le modèle du nom de fichier à saisir est défini dans getResourcePattaern. $ {date} est remplacé par la valeur de l'argument de lot date.

workspace/example-m3bp/src/main/java/com/example/jobflow/SalesDetailFromJson.java


package com.example.jobflow;

import com.example.modelgen.dmdl.json.AbstractSalesDetailJsonInputDescription;

/**
 *Déclaration de vente directe I/Entrez avec O.
 *Le fichier d'entrée est{@code sales}Tous les fichiers ci-dessus.
 */
public class SalesDetailFromJson extends AbstractSalesDetailJsonInputDescription {

    @Override
    public String getBasePath() {
        return "sales";
    }

    @Override
    public String getResourcePattern() {
        return "**/sales.${date}*.log";
    }

    @Override
    public DataSize getDataSize() {
        return DataSize.LARGE;
    }
}

Créez et déployez des applications par lots Asakusa

La commande / workspace / build.sh définie dans le Dockerfile est exécutée avec la commande suivante pour créer l'application batch Asakusa dans le conteneur m3bp et la déployer sur ʻASAKUSA_HOME`.

docker-compose run m3bp

workspace/build.sh


#!/bin/bash
PRJ_HOME=/workspace/example-m3bp

cd ${PRJ_HOME}
./gradlew -g /workspace/lib assemble
rm -rf $ASAKUSA_HOME
mkdir $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xvf ${PRJ_HOME}/build/asakusafw-example-m3bp.tar.gz
java -jar tools/bin/setup.jar

Exécution par lots

La date et l'heure sont spécifiées pour spécifier le fichier JSON à agréger dans l'argument de lot. Dans les cas suivants, un fichier JSON contenant «2019-04-24» sera saisi et le total des ventes par catégorie pour la date spécifiée sera calculé.

docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24

Si vous souhaitez calculer le total toutes les 5 minutes, procédez comme suit.

docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24-1835

Vérifiez le fichier de résultats

Le fichier de résultat est sorti dans le répertoire suivant sur le système d'exploitation hôte.

cat directio/result/category/result.csv 
Code de catégorie,Quantité de vente,Ventes totales
1600,850,102000

finalement

Dans cet article, nous avons vu un exemple simple pour saisir des fichiers au format JSON Lines. Je pense qu'il peut être appliqué de différentes manières, telles que la collecte de données de capteurs IoT avec fluentd et leur traitement sur M3BP ou Hadoop, j'espère donc que cela vous sera utile.

Recommended Posts

Agréger les journaux Fluentd avec Asakusa
Bonjour tout le monde! Avec Asakusa Framework!