Cet article décrit un exemple d'application qui utilise Asakusa pour saisir et agréger les journaux au format JSON Lines qui sont envoyés séquentiellement à partir de Fluentd. Il a les mêmes spécifications que l'application Asakusa introduite dans l'article précédent Linking Asakusa on M3BP and DB, mais la dernière fois, il a lié les détails de vente de Database. Entrez la sortie du journal de Fluentd. Le diagramme de flux de données est le suivant.
Nous avons confirmé l'opération dans l'environnement suivant.
Le code source peut être trouvé sur GitHub.
Définissez deux conteneurs, fluentd et m3bp, sur docker.
Le répertoire . / Directio
sur le système d'exploitation hôte est monté dans deux conteneurs.
Les données d'entrée et les fichiers de résultats de lots sont placés sous le répertoire . / Directio
.
Le code source d'Asakusa est placé dans . / Workspace
, et Asakusa sur M3BP est placé sur le conteneur m3bp. Construisez et déployez pour.
docker-compose.yml
version: '3'
services:
fluentd:
build: ./fluentd
ports:
- "24224:24224"
volumes:
- ./fluentd/etc:/fluentd/etc
- ./directio:/directio
environment:
- FLUENTD_CONF=fluent.conf
m3bp:
build: .
volumes:
- ./workspace:/workspace
- ./directio:/directio
Le Dockerfile pour le conteneur m3bp ressemble à ceci: J'ai installé les packages nécessaires pour construire Asakusa sur M3BP.
Dockerfile
FROM centos:centos7
ENV ASAKUSA_HOME /workspace/asakusa
ENV JAVA_HOME /usr/lib/jvm/java
ENV PATH ${PATH}:${JAVA_HOME}/bin:${ASAKUSA_HOME}/bin
ADD ./workspace /workspace
ADD ./directio /directio
WORKDIR /workspace
RUN yum install -y cmake make gcc-c++ hwloc java-1.8.0-openjdk java-1.8.0-openjdk-devel
CMD ["bash", "/workspace/build.sh"]
Démarrez le conteneur Docker avec la commande suivante:
docker-compose up -d
Le journal des relevés de vente au format JSON Lines est entré et sorti dans le répertoire . / Directio / sales
et en dessous toutes les 30 secondes.
Dans cet exemple, le plug-in factice génère un relevé de vente fixe.
fluentd/etc/fluent.conf
<source>
@type dummy
tag sales.log
dummy {"sales_date_time":"2019-04-01 10:30:00","item_code":"4922010001000","amount":2,"unit_selling_price":120,"selling_price":240}
</source>
<match sales.**>
@type file
path /directio/sales/sales
time_slice_format %Y-%m-%d-%H%M
<format>
@type json
</format>
<buffer>
@type file
path /fluentd/buffer/
flush_mode interval
flush_interval 30s
timekey 5m
</buffer>
</match>
Si vous attendez un moment après le démarrage du conteneur fluentd, un journal sera généré toutes les 30 secondes avec le nom de fichier suivant. La date et l'heure du nom de fichier changent toutes les 5 minutes.
ll directio/sales/
total 32
-rw-r--r-- 1 suga staff 4158 4 24 18:36 sales.2019-04-24-1835_0.log
-rw-r--r-- 1 suga staff 4032 4 24 18:37 sales.2019-04-24-1835_1.log
-rw-r--r-- 1 suga staff 3780 4 24 18:37 sales.2019-04-24-1835_2.log
Direct I / O JSON * Définissez les détails des ventes au format JSON Lines en utilisant la fonction * Je suis. (* Ceci est une fonction ajoutée à partir de 0.10.3)
workspace/example-m3bp/src/main/dmdl/models.dmdl
"Détails des ventes"
@directio.json(
format = jsonl,
datetime_format = "yyyy-MM-dd HH:mm:ss"
)
sales_detail = {
sales_date_time : DATETIME;
item_code : TEXT;
amount : INT;
unit_selling_price : INT;
selling_price : INT;
};
Le modèle du nom de fichier à saisir est défini dans getResourcePattaern
. $ {date}
est remplacé par la valeur de l'argument de lot date
.
workspace/example-m3bp/src/main/java/com/example/jobflow/SalesDetailFromJson.java
package com.example.jobflow;
import com.example.modelgen.dmdl.json.AbstractSalesDetailJsonInputDescription;
/**
*Déclaration de vente directe I/Entrez avec O.
*Le fichier d'entrée est{@code sales}Tous les fichiers ci-dessus.
*/
public class SalesDetailFromJson extends AbstractSalesDetailJsonInputDescription {
@Override
public String getBasePath() {
return "sales";
}
@Override
public String getResourcePattern() {
return "**/sales.${date}*.log";
}
@Override
public DataSize getDataSize() {
return DataSize.LARGE;
}
}
La commande / workspace / build.sh
définie dans le Dockerfile est exécutée avec la commande suivante pour créer l'application batch Asakusa dans le conteneur m3bp et la déployer sur ʻASAKUSA_HOME`.
docker-compose run m3bp
workspace/build.sh
#!/bin/bash
PRJ_HOME=/workspace/example-m3bp
cd ${PRJ_HOME}
./gradlew -g /workspace/lib assemble
rm -rf $ASAKUSA_HOME
mkdir $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xvf ${PRJ_HOME}/build/asakusafw-example-m3bp.tar.gz
java -jar tools/bin/setup.jar
La date et l'heure sont spécifiées pour spécifier le fichier JSON à agréger dans l'argument de lot. Dans les cas suivants, un fichier JSON contenant «2019-04-24» sera saisi et le total des ventes par catégorie pour la date spécifiée sera calculé.
docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24
Si vous souhaitez calculer le total toutes les 5 minutes, procédez comme suit.
docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24-1835
Le fichier de résultat est sorti dans le répertoire suivant sur le système d'exploitation hôte.
cat directio/result/category/result.csv
Code de catégorie,Quantité de vente,Ventes totales
1600,850,102000
Dans cet article, nous avons vu un exemple simple pour saisir des fichiers au format JSON Lines. Je pense qu'il peut être appliqué de différentes manières, telles que la collecte de données de capteurs IoT avec fluentd et leur traitement sur M3BP ou Hadoop, j'espère donc que cela vous sera utile.