Lier Asakusa sur M3 BP et DB

introduction

Dans l'article précédent Integrating Asakusa Framework and Impala to visualize with BI tools, traitement de données à grande échelle sur Hadoop Introduction de la méthode de mise en œuvre de l'architecture en supposant. Cette fois, nous présenterons la méthode d'implémentation pour relier les données commerciales de la base de données à Asakusa sur M3BP. .. La figure ci-dessous résume les caractéristiques de l'architecture.

m3bp_windgate.png

Ce que vous voulez faire dans cet article

L'exemple d'application entre le tableau "Détails des ventes" et le tableau "Produit principal" de la base de données, agrège le montant des ventes pour chaque catégorie et le sort dans le tableau "Agrégation des ventes par catégorie".

Traitement de "1. Combinaison de fiches produits"

Traitement de "2. Agrégation par catégorie" Additionner la quantité et le montant des ventes en utilisant le code de catégorie comme clé

summarizeSales_dfd.png

Les définitions des tables d'entrée et de sortie sont les suivantes:

article Moule PK
Date et heure de vente (SALES_DATE_TIME) DATETIME
Code produit (ITEM_CODE) VARCHAR(13)
Quantité (AMOUNT) INT
article Moule PK
Code produit (ITEM_CODE) VARCHAR(13)
Nom du produit (ITEM_NAME) VARCHAR(128)
Code de catégorie (CATÉGORIE_CODE) CHAR(4)
Nom de la catégorie (CATÉGORIE_NAME) CHAR(128)
Prix unitaire (UNIT_PRICE) INT
Date d'enregistrement principal (ENREGISTRÉ)_DATE) DATE
Date de début de la demande principale (BEGIN)_DATE) DATE
Date de fin de la demande principale (FIN_DATE) DATE
article Moule PK
Code de catégorie (CATÉGORIE_CODE) CHAR(4)
Nom de la catégorie (CATÉGORIE_NAME) CHAR(128)
Quantité (AMOUNT) INT
Montant des ventes (VENTE_PRICE) INT

Environnement d'exploitation

Nous avons confirmé l'opération dans l'environnement suivant.

Comme travail préliminaire, le logiciel ci-dessus (MySQL, JDK, Gradle) doit être installé sur CentOS.

Le code source peut être trouvé sur GitHub.

Préparation des données DB

demo.sql


CREATE DATABASE demo;
CREATE USER demo@'%' IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@'%';
CREATE USER demo@localhost IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@localhost;

CREATE TABLE demo.SALES_DETAIL
(
	SALES_DATE_TIME DATETIME NOT NULL COMMENT 'Date et heure de vente',
	ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Code produit',
	AMOUNT INT NOT NULL COMMENT 'quantité',
	PRIMARY KEY(SALES_DATE_TIME, ITEM_CODE)
) COMMENT = 'Détails des ventes';

CREATE TABLE demo.ITEM_INFO
(
	ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Code produit',
	ITEM_NAME VARCHAR(128) COMMENT 'Nom du produit',
	CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Code de catégorie',
	CATEGORY_NAME VARCHAR(128) COMMENT 'Nom de catégorie',
	UNIT_PRICE INT NOT NULL COMMENT 'Prix unitaire',
	REGISTERED_DATE DATE NOT NULL COMMENT 'Date d'enregistrement principal',
	BEGIN_DATE DATE NOT NULL COMMENT 'Date de début de la demande principale',
	END_DATE DATE NOT NULL COMMENT 'Date de fin de la demande principale',
	PRIMARY KEY(ITEM_CODE, BEGIN_DATE)
) COMMENT = 'Maître de produit';

CREATE TABLE demo.CATEGORY_SUMMARY
(
	CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Code de catégorie',
	CATEGORY_NAME VARCHAR(128) COMMENT 'Nom de catégorie',
	AMOUNT INT NOT NULL COMMENT 'quantité',
	SELLING_PRICE INT NOT NULL COMMENT 'Montant des ventes'
) COMMENT = 'Ventes totales par catégorie';

INSERT INTO SALES_DETAIL VALUES ('2017-03-31 23:59:59','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:30:00','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:31:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:32:00','4922010001000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:33:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:35:00','4922020002000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:36:00','4922020002001',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:38:00','4922020002000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:39:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:41:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:42:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-02 10:10:00','4922020002002',2);

INSERT INTO ITEM_INFO VALUES ('4922010001000','Chocolat au lait M','1600','bonbons au chocolat',120,'2017-04-01','2017-04-01','2018-01-19');
INSERT INTO ITEM_INFO VALUES ('4922010001000','Chocolat au lait M','1600','bonbons au chocolat',130,'2018-01-20','2018-01-20','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001001','Chocolat assorti PREMIUM','1600','bonbons au chocolat',330,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001002','Mini croquant aux amandes','1600','bonbons au chocolat',140,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002000','Coupe de sauce soja aux nouilles','1401','Coupe de nouilles',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002001','Coupe de sel de nouilles','1401','Coupe de nouilles',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002002','Coupe de nouilles au curry','1401','Coupe de nouilles',120,'2017-04-01','2017-04-01','2019-12-31');

commit;

Reflète le fichier SQL créé.

mysql -u demo -p demo < demo.sql

Préparation de l'environnement de développement

Créez un dossier de projet. Dans cet exemple, nous travaillerons sur les dossiers suivants.

mkdir asakusa-example-windgate

Créez un fichier de script Gradle sous le dossier du projet. Les paramètres suivants sont principalement ajoutés.

Pour plus d'informations sur les scripts Gradle, reportez-vous à la référence du plug-in Asakusa Gradle (http://docs.asakusafw.com/latest/release/ja/html/application/gradle-plugin-reference.html).

build.gradle


group 'com.example'

buildscript {
    repositories {
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
    }
    dependencies {
        classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
    }
}

apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-m3bp'
apply plugin: 'eclipse'

asakusafw {
    m3bp {
        option 'm3bp.native.cmake.CMAKE_TOOLCHAIN_FILE', System.getProperty('m3bp.toolchain')
        option 'windgate.jdbc.direct', '*'
    }
}

asakusafwOrganizer {
    profiles.prod {
        hadoop.embed true
        assembly.into('.') {
            put 'src/dist/prod'
        }
    }
    extension {
        libraries += ['mysql:mysql-connector-java:5.1.45']
    }
}

Sous le dossier du projet, exécutez la commande Gradle ci-dessous pour l'importer en tant que projet Eclipse. Si vous souhaitez utiliser IntelliJ IDEA, veuillez vous reporter à Utilisation d'IntelliJ officiel. Si vous utilisez Eclipse, pensez à utiliser Shafu (plug-in Eclipse pour le développement Asakusa).

gradle eclipse

Paramètres M3BP

Les paramètres de M3BP sont décrits dans le fichier m3bp.properties. Cette section décrit les paramètres liés au pilote JDBC pour WindGate JDBC Direct Mode. «Exemple» inclus dans la clé de propriété suivante correspond à «profileName» défini dans la classe de définition d'entrée / sortie. Pour plus d'informations, consultez Asakusa sur les paramètres d'optimisation M3BP (http://docs.asakusafw.com/latest/release/ja/html/m3bp/optimization.html).

src/dist/prod/m3bp/conf/m3bp.properties


com.asakusafw.dag.jdbc.example.url=jdbc:mysql://localhost/demo
com.asakusafw.dag.jdbc.example.driver=com.mysql.jdbc.Driver
com.asakusafw.dag.jdbc.example.properties.user=demo
com.asakusafw.dag.jdbc.example.properties.password=demo

Génération de classe de modèle de données

DMDL (Data Model Definition Language) Créez un fichier de script. Le modèle d'entrée «détails des ventes» et «produit de base» et le modèle de sortie «agrégation des ventes par catégorie» sont définis. Spécifiez @ windgate.jdbc.column pour la propriété correspondant à la colonne dans la base de données. Pour plus d'informations sur les paramètres Windgate, consultez Générer automatiquement DataModelJdbcSupport (http://docs.asakusafw.com/latest/release/ja/html/windgate/user-guide.html#datamodeljdbcsupport).

src/main/dmdl/models.dmdl


"Détails des ventes"
@windgate.jdbc.table(
    name = "demo.SALES_DETAIL"
)
sales_detail = {

    "Date et heure de vente"
    @windgate.jdbc.column(name = "SALES_DATE_TIME")
    sales_date_time : DATETIME;

    "Code produit"
    @windgate.jdbc.column(name = "ITEM_CODE")
    item_code : TEXT;

    "quantité"
    @windgate.jdbc.column(name = "AMOUNT")
    amount : INT;
};

"Maître de produit"
@windgate.jdbc.table(
    name = "demo.ITEM_INFO"
)
item_info = {

    "Code produit"
    @windgate.jdbc.column(name = "ITEM_CODE")
    item_code : TEXT;

    "Nom du produit"
    @windgate.jdbc.column(name = "ITEM_NAME")
    item_name : TEXT;

    "Code de catégorie de produit"
    @windgate.jdbc.column(name = "CATEGORY_CODE")
    category_code : TEXT;

    "Nom de la catégorie de produit"
    @windgate.jdbc.column(name = "CATEGORY_NAME")
    category_name : TEXT;

    "Prix unitaire"
    @windgate.jdbc.column(name = "UNIT_PRICE")
    unit_price : INT;

    "Date d'enregistrement principal"
    @windgate.jdbc.column(name = "REGISTERED_DATE")
    registered_date : DATE;

    "Date de début de la demande principale"
    @windgate.jdbc.column(name = "BEGIN_DATE")
    begin_date : DATE;

    "Date de fin de la demande principale"
    @windgate.jdbc.column(name = "END_DATE")
    end_date : DATE;
};

"Ventes totales par catégorie"
@windgate.jdbc.table(
    name = "demo.CATEGORY_SUMMARY"
)
category_summary = {

    sales_date_time : DATETIME;

    item_code : TEXT;

    @windgate.jdbc.column(name = "CATEGORY_CODE")
    category_code : TEXT;

    @windgate.jdbc.column(name = "CATEGORY_NAME")
    category_name : TEXT;

    @windgate.jdbc.column(name = "AMOUNT")
    amount : INT;

    @windgate.jdbc.column(name = "SELLING_PRICE")
    selling_price : INT;
};

Lorsque vous exécutez la commande Gradle suivante, une classe de modèle de données pouvant être utilisée par Asakusa Framework est générée en fonction du fichier de script créé. (Exécuter sous le dossier du projet)

gradle compileDMDL

Classe de définition d'entrée / sortie

Dans chaque classe de définition d'entrée / sortie, définissez profileName à associer aux paramètres JDBC définis dans le fichier m3bp.properties. De plus, la classe SalesDetailFromJDBC.java associée à la table des articles de vente a une expression conditionnelle qui correspond à la clause WHERE de SQL. La valeur de l'argument au démarrage du lot est affectée à «$ {DATE}».

src/main/java/com/example/jobflow/SalesDetailFromJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractSalesDetailJdbcImporterDescription;

public class SalesDetailFromJDBC extends AbstractSalesDetailJdbcImporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }

    @Override
    public String getCondition() {
        return "SALES_DATE_TIME between '${DATE} 00:00:00' and '${DATE} 23:59:59'";
    }
}

src/main/java/com/example/jobflow/ItemInfoFromJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractItemInfoJdbcImporterDescription;

public class ItemInfoFromJDBC extends AbstractItemInfoJdbcImporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }
}

src/main/java/com/example/jobflow/CategorySummaryToJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractCategorySummaryJdbcExporterDescription;

public class CategorySummaryToJDBC extends AbstractCategorySummaryJdbcExporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }
}

Créer une classe d'opérateur

Parmi les processus décrits dans DFD, «1. Combinaison de produits génériques» et «2. Agrégation par catégorie» sont mis en œuvre en tant qu'opérateurs. "1. Combiner des produits maîtres" est la méthode joinItemInfo ([opérateur MasterJoinUpdate](http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-join- Il est implémenté par update-operator)). Étant donné que le code produit seul ne donne pas une combinaison équivalente, définissez la condition de sélection principale ([opérateur auxiliaire MasterSelection](http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-" Il est ajouté par selection-support-operator)). «2. Agrégation par catégorie» est la méthode «summaryByCategory» ([opérateur «Fold») (http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#fold-operator) ) Est implémenté. La quantité et le montant des ventes sont totalisés lors du pliage en utilisant le code de catégorie comme clé.

src/main/java/com/example/operator/SummarizeSalesOperator.java


package com.example.operator;

import java.util.List;

import com.asakusafw.runtime.value.Date;
import com.asakusafw.runtime.value.DateTime;
import com.asakusafw.runtime.value.DateUtil;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.Fold;
import com.asakusafw.vocabulary.operator.MasterJoinUpdate;
import com.asakusafw.vocabulary.operator.MasterSelection;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;

public abstract class SummarizeSalesOperator {
    private final Date dateBuffer = new Date();

    @MasterSelection
    public ItemInfo selectAvailableItem(List<ItemInfo> candidates, CategorySummary sales) {
        DateTime dateTime = sales.getSalesDateTime();
        dateBuffer.setElapsedDays(DateUtil.getDayFromDate(
                dateTime.getYear(), dateTime.getMonth(), dateTime.getDay()));
        for (ItemInfo item : candidates) {
            if (item.getBeginDate().compareTo(dateBuffer) <= 0
                    && dateBuffer.compareTo(item.getEndDate()) <= 0) {
                return item;
            }
        }
        return null;
    }

    @MasterJoinUpdate(selection = "selectAvailableItem")
    public void joinItemInfo(
            @Key(group = "item_code") ItemInfo info,
            @Key(group = "item_code") CategorySummary sales) {
        sales.setCategoryCodeOption(info.getCategoryCodeOption());
        sales.setCategoryNameOption(info.getCategoryNameOption());
        sales.setSellingPrice(sales.getAmount() * info.getUnitPrice());
    }

    @Fold
    public void summarizeByCategory(@Key(group = "category_code") CategorySummary left, CategorySummary right) {
        left.setAmount(left.getAmount() + right.getAmount());
        left.setSellingPrice(left.getSellingPrice() + right.getSellingPrice());
    }
}

Créer une classe de flux de travaux

Selon la conception de DFD, entrez "Détails des ventes ( sales) "et" Product master (ʻitem) "et connectez-vous à" 1. Combine product master (joinItemInfo)" "2. Agréger par catégorie (summaryByCategory)" Implémente pour afficher le résultat du traitement dans "CategorySummary".

src/main/example/jobflow/SummarizeSalesJob.java


package com.example.jobflow;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
import com.example.modelgen.dmdl.model.SalesDetail;
import com.example.operator.SummarizeSalesOperatorFactory;
import com.example.operator.SummarizeSalesOperatorFactory.JoinItemInfo;
import com.example.operator.SummarizeSalesOperatorFactory.SummarizeByCategory;

@JobFlow(name = "summarizeSalesJob")
public class SummarizeSalesJob extends FlowDescription {
    final In<SalesDetail> sales;
    final In<ItemInfo> item;
    final Out<CategorySummary> categorySummary;
    public SummarizeSalesJob(
            @Import(name = "sales", description = SalesDetailFromJDBC.class)
            In<SalesDetail> sales,
            @Import(name = "item", description = ItemInfoFromJDBC.class)
            In<ItemInfo> item,
            @Export(name = "result", description = CategorySummaryToJDBC.class)
            Out<CategorySummary> categorySummary) {
        this.sales = sales;
        this.item = item;
        this.categorySummary = categorySummary;
    }

    @Override
    protected void describe() {
        CoreOperatorFactory core = new CoreOperatorFactory();
        SummarizeSalesOperatorFactory operator = new SummarizeSalesOperatorFactory();

        JoinItemInfo joinedItem
                = operator.joinItemInfo(item, core.restructure(sales, CategorySummary.class));
        core.stop(joinedItem.missed);
        SummarizeByCategory summarized = operator.summarizeByCategory(joinedItem.updated);
        categorySummary.add(summarized.out);

Créer une classe batch

Implémentez une classe batch qui exécute le flux de travaux (SummarizeSalesJob).

src/main/java/com/example/batch/SummarizeBatch.java


package com.example.batch;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SummarizeSalesJob;

@Batch(name = "example.summarizeSales")
public class SummarizeBatch extends BatchDescription {

    @Override
    protected void describe() {
        run(SummarizeSalesJob.class).soon();
    }
}

Créer et déployer une application

Installez à l'avance les packages requis pour l'environnement de construction M3BP.

sudo yum -y install cmake
sudo yum -y install make
sudo yum -y install gcc-c++
sudo yum -y install hwloc

Définissez la variable d'environnement ʻASAKUSA_HOME dans .bash_profile` etc. comme indiqué dans l'exemple ci-dessous.

.bash_profile


export ASAKUSA_HOME=$HOME/asakusa

Exécutez la commande gradle assemble à partir du dossier du projet pour créer un fichier d'archive de déploiement pour M3BP. Développez le fichier créé sur le chemin de la variable d'environnement ʻASAKUSA_HOME et exécutez la commande setup.jar`.

gradle assemble
rm -rf $ASAKUSA_HOME
mkdir -p $ASAKUSA_HOME
cp ./build/asakusafw-*.tar.gz $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xzf asakusafw-*.tar.gz
java -jar $ASAKUSA_HOME/tools/bin/setup.jar

Lancer l'application

Exécutez l'application sur M3BP avec l'ID de lot comme argument dans YAESS. Spécifiez «2017-04-01» pour le paramètre de lot «DATE». Selon les conditions définies dans SalesDetailFromJDBC.java de la classe de définition d'entrée, les données de détail des ventes de la date spécifiée dans l'argument de lot seront traitées.

$ASAKUSA_HOME/yaess/bin/yaess-batch.sh m3bp.example.summarizeSales -A DATE=2017-04-01

Le résultat de l'exécution est enregistré dans la table CATEGORY_SUMMARY.

mysql -u demo -p demo -e 'select * from CATEGORY_SUMMARY';
Enter password: 
+---------------+--------------------------+--------+---------------+
| CATEGORY_CODE | CATEGORY_NAME            | AMOUNT | SELLING_PRICE |
+---------------+--------------------------+--------+---------------+
| 1600          |bonbons au chocolat|     11 |          2220 |
| 1401          |Coupe de nouilles|      5 |           490 |
+---------------+--------------------------+--------+---------------+

finalement

Dans l'article précédent (Hello, World!), Vanilla sur Windows, et dans l'article précédent (Asakusa Framework et Impala sont liés aux outils BI) Visualize) a introduit le moteur d'exécution d'Asakusa pour Spark sur Hadoop, cette fois M3BP. Comme vous pouvez le voir dans les articles jusqu'à présent, il n'est pas nécessaire de modifier le code source, et vous pouvez changer le moteur d'exécution simplement en changeant les paramètres du plug-in de build.gradle. Nous espérons que cela vous sera utile lors de l'examen du type d'architecture qui convient le mieux à la taille de vos données.

Recommended Posts

Lier Asakusa sur M3 BP et DB
Traitement des liens et SQLite