Verbinde Asakusa mit M3 BP und DB

Einführung

Im vorherigen Artikel Integration von Asakusa Framework und Impala zur Visualisierung mit BI-Tools wurden umfangreiche Daten in Hadoop verarbeitet Einführung der Implementierungsmethode der Architektur unter der Annahme. Dieses Mal werde ich die Implementierungsmethode zum Verknüpfen von Geschäftsdaten in der Datenbank mit [Asakusa on M3BP] vorstellen (http://docs.asakusafw.com/latest/release/ja/html/m3bp/index.html). .. Die folgende Abbildung fasst die Merkmale der Architektur zusammen.

m3bp_windgate.png

Was Sie in diesem Artikel tun möchten

Die Beispielanwendung gibt die Tabelle "Verkaufsdetails" und die Tabelle "Produktstamm" aus der Datenbank ein, aggregiert den Verkaufsbetrag für jede Kategorie und gibt ihn in die Tabelle "Verkaufsaggregation nach Kategorie" aus.

Verarbeitung von "1. Produktmaster kombinieren"

Verarbeitung von "2. Aggregation nach Kategorie" Summieren Sie die Menge und den Verkaufsbetrag mit dem Kategoriecode als Schlüssel

summarizeSales_dfd.png

Die Definitionen der Eingabe- und Ausgabetabelle lauten wie folgt:

Artikel Schimmel PK
Verkaufsdatum und -zeit (VERKAUF_DATE_TIME) DATETIME
Produktcode (ARTIKEL_CODE) VARCHAR(13)
Gesamtmenge) INT
Artikel Schimmel PK
Produktcode (ARTIKEL_CODE) VARCHAR(13)
Produktname (ARTIKEL_NAME) VARCHAR(128)
Kategoriecode (KATEGORIE_CODE) CHAR(4)
Kategoriename (KATEGORIE_NAME) CHAR(128)
Stückpreis (EINHEIT_PRICE) INT
Master-Registrierungsdatum (REGISTRIERT)_DATE) DATE
Startdatum der Master-Bewerbung (BEGIN)_DATE) DATE
Enddatum der Master-Bewerbung (ENDE)_DATE) DATE
Artikel Schimmel PK
Kategoriecode (KATEGORIE_CODE) CHAR(4)
Kategoriename (KATEGORIE_NAME) CHAR(128)
Gesamtmenge) INT
Verkaufsbetrag (VERKAUF_PRICE) INT

Betriebsumgebung

Wir haben den Betrieb in der folgenden Umgebung bestätigt.

Als Vorarbeit muss die oben genannte Software ("MySQL", "JDK", "Gradle") unter CentOS installiert werden.

Der Quellcode ist auf GitHub zu finden.

Vorbereitung der DB-Daten

demo.sql


CREATE DATABASE demo;
CREATE USER demo@'%' IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@'%';
CREATE USER demo@localhost IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@localhost;

CREATE TABLE demo.SALES_DETAIL
(
	SALES_DATE_TIME DATETIME NOT NULL COMMENT 'Verkaufsdatum und -zeit',
	ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Produktcode',
	AMOUNT INT NOT NULL COMMENT 'Menge',
	PRIMARY KEY(SALES_DATE_TIME, ITEM_CODE)
) COMMENT = 'Verkaufsdetails';

CREATE TABLE demo.ITEM_INFO
(
	ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Produktcode',
	ITEM_NAME VARCHAR(128) COMMENT 'Produktname',
	CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Kategoriecode',
	CATEGORY_NAME VARCHAR(128) COMMENT 'Kategoriename',
	UNIT_PRICE INT NOT NULL COMMENT 'Stückpreis',
	REGISTERED_DATE DATE NOT NULL COMMENT 'Datum der Master-Registrierung',
	BEGIN_DATE DATE NOT NULL COMMENT 'Startdatum der Master-Anwendung',
	END_DATE DATE NOT NULL COMMENT 'Enddatum der Master-Bewerbung',
	PRIMARY KEY(ITEM_CODE, BEGIN_DATE)
) COMMENT = 'Produktstamm';

CREATE TABLE demo.CATEGORY_SUMMARY
(
	CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Kategoriecode',
	CATEGORY_NAME VARCHAR(128) COMMENT 'Kategoriename',
	AMOUNT INT NOT NULL COMMENT 'Menge',
	SELLING_PRICE INT NOT NULL COMMENT 'Verkaufszahlen'
) COMMENT = 'Gesamtumsatz nach Kategorien';

INSERT INTO SALES_DETAIL VALUES ('2017-03-31 23:59:59','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:30:00','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:31:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:32:00','4922010001000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:33:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:35:00','4922020002000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:36:00','4922020002001',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:38:00','4922020002000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:39:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:41:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:42:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-02 10:10:00','4922020002002',2);

INSERT INTO ITEM_INFO VALUES ('4922010001000','Milchschokolade M.','1600','Praline',120,'2017-04-01','2017-04-01','2018-01-19');
INSERT INTO ITEM_INFO VALUES ('4922010001000','Milchschokolade M.','1600','Praline',130,'2018-01-20','2018-01-20','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001001','PREMIUM Verschiedene Schokolade','1600','Praline',330,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001002','Mandel Crunch Mini','1600','Praline',140,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002000','Tasse Nudelsojasauce','1401','Cup Nudel',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002001','Tasse Nudelsalz','1401','Cup Nudel',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002002','Cup Nudel Curry','1401','Cup Nudel',120,'2017-04-01','2017-04-01','2019-12-31');

commit;

Reflektiert die erstellte SQL-Datei.

mysql -u demo -p demo < demo.sql

Vorbereitung der Entwicklungsumgebung

Erstellen Sie einen Projektordner. In diesem Beispiel werden wir an den folgenden Ordnern arbeiten.

mkdir asakusa-example-windgate

Erstellen Sie eine Gradle-Skriptdatei unter dem Projektordner. Die folgenden Einstellungen werden hauptsächlich hinzugefügt.

Weitere Informationen zu Gradle-Skripten finden Sie in der Asakusa Gradle Plugin-Referenz (http://docs.asakusafw.com/latest/release/ja/html/application/gradle-plugin-reference.html).

build.gradle


group 'com.example'

buildscript {
    repositories {
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
    }
    dependencies {
        classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
    }
}

apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-m3bp'
apply plugin: 'eclipse'

asakusafw {
    m3bp {
        option 'm3bp.native.cmake.CMAKE_TOOLCHAIN_FILE', System.getProperty('m3bp.toolchain')
        option 'windgate.jdbc.direct', '*'
    }
}

asakusafwOrganizer {
    profiles.prod {
        hadoop.embed true
        assembly.into('.') {
            put 'src/dist/prod'
        }
    }
    extension {
        libraries += ['mysql:mysql-connector-java:5.1.45']
    }
}

Führen Sie im Projektordner den folgenden Gradle-Befehl aus, um ihn als Eclipse-Projekt zu importieren. Wenn Sie IntelliJ IDEA verwenden möchten, lesen Sie bitte Using Official IntelliJ. Wenn Sie Eclipse verwenden, sollten Sie Shafu (Eclipse-Plug-In für die Asakusa-Entwicklung) verwenden.

gradle eclipse

M3BP-Einstellungen

Die Einstellungen für M3BP werden in der Datei "m3bp.properties" beschrieben. In diesem Abschnitt werden die Einstellungen für den JDBC-Treiber für WindGate JDBC Direct Mode beschrieben. Das im Eigenschaftsschlüssel unten enthaltene "Beispiel" entspricht dem in der Definitionsklasse für Eingabe / Ausgabe festgelegten "Profilname". Weitere Informationen finden Sie unter Asakusa zu den M3BP-Optimierungseinstellungen (http://docs.asakusafw.com/latest/release/ja/html/m3bp/optimization.html).

src/dist/prod/m3bp/conf/m3bp.properties


com.asakusafw.dag.jdbc.example.url=jdbc:mysql://localhost/demo
com.asakusafw.dag.jdbc.example.driver=com.mysql.jdbc.Driver
com.asakusafw.dag.jdbc.example.properties.user=demo
com.asakusafw.dag.jdbc.example.properties.password=demo

Generierung von Datenmodellklassen

DMDL (Data Model Definition Language) Erstellen Sie eine Skriptdatei. Das Eingabemodell "Verkaufsdetails" und "Produktstamm" sowie das Ausgabemodell "Verkaufsaggregation nach Kategorie" sind definiert. Geben Sie "@ windgate.jdbc.column" für die Eigenschaft an, die der Spalte in der Datenbank entspricht. Weitere Informationen zu den Windgate-Einstellungen finden Sie unter Automatisches Generieren von DataModelJdbcSupport (http://docs.asakusafw.com/latest/release/ja/html/windgate/user-guide.html#datamodeljdbcsupport).

src/main/dmdl/models.dmdl


"Verkaufsdetails"
@windgate.jdbc.table(
    name = "demo.SALES_DETAIL"
)
sales_detail = {

    "Verkaufsdatum und -zeit"
    @windgate.jdbc.column(name = "SALES_DATE_TIME")
    sales_date_time : DATETIME;

    "Produktcode"
    @windgate.jdbc.column(name = "ITEM_CODE")
    item_code : TEXT;

    "Menge"
    @windgate.jdbc.column(name = "AMOUNT")
    amount : INT;
};

"Produktstamm"
@windgate.jdbc.table(
    name = "demo.ITEM_INFO"
)
item_info = {

    "Produktcode"
    @windgate.jdbc.column(name = "ITEM_CODE")
    item_code : TEXT;

    "Produktname"
    @windgate.jdbc.column(name = "ITEM_NAME")
    item_name : TEXT;

    "Produktkategoriecode"
    @windgate.jdbc.column(name = "CATEGORY_CODE")
    category_code : TEXT;

    "Name der Produktkategorie"
    @windgate.jdbc.column(name = "CATEGORY_NAME")
    category_name : TEXT;

    "Stückpreis"
    @windgate.jdbc.column(name = "UNIT_PRICE")
    unit_price : INT;

    "Datum der Master-Registrierung"
    @windgate.jdbc.column(name = "REGISTERED_DATE")
    registered_date : DATE;

    "Startdatum der Master-Anwendung"
    @windgate.jdbc.column(name = "BEGIN_DATE")
    begin_date : DATE;

    "Enddatum der Master-Bewerbung"
    @windgate.jdbc.column(name = "END_DATE")
    end_date : DATE;
};

"Gesamtumsatz nach Kategorien"
@windgate.jdbc.table(
    name = "demo.CATEGORY_SUMMARY"
)
category_summary = {

    sales_date_time : DATETIME;

    item_code : TEXT;

    @windgate.jdbc.column(name = "CATEGORY_CODE")
    category_code : TEXT;

    @windgate.jdbc.column(name = "CATEGORY_NAME")
    category_name : TEXT;

    @windgate.jdbc.column(name = "AMOUNT")
    amount : INT;

    @windgate.jdbc.column(name = "SELLING_PRICE")
    selling_price : INT;
};

Wenn Sie den folgenden Gradle-Befehl ausführen, wird basierend auf der erstellten Skriptdatei eine Datenmodellklasse generiert, die von Asakusa Framework verwendet werden kann. (Unter dem Projektordner ausführen)

gradle compileDMDL

Eingabe- / Ausgabedefinitionsklasse

Stellen Sie in jeder Eingabe- / Ausgabedefinitionsklasse profileName so ein, dass es den in der Datei m3bp.properties definierten JDBC-Einstellungen zugeordnet wird. Darüber hinaus verfügt die Klasse "SalesDetailFromJDBC.java", die sich auf die Verkaufselementtabelle bezieht, über einen bedingten Ausdruck, der der WHERE-Klausel von SQL entspricht. Der Wert des Arguments beim Batch-Start wird "$ {DATE}" zugewiesen.

src/main/java/com/example/jobflow/SalesDetailFromJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractSalesDetailJdbcImporterDescription;

public class SalesDetailFromJDBC extends AbstractSalesDetailJdbcImporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }

    @Override
    public String getCondition() {
        return "SALES_DATE_TIME between '${DATE} 00:00:00' and '${DATE} 23:59:59'";
    }
}

src/main/java/com/example/jobflow/ItemInfoFromJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractItemInfoJdbcImporterDescription;

public class ItemInfoFromJDBC extends AbstractItemInfoJdbcImporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }
}

src/main/java/com/example/jobflow/CategorySummaryToJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractCategorySummaryJdbcExporterDescription;

public class CategorySummaryToJDBC extends AbstractCategorySummaryJdbcExporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }
}

Erstellen einer Operatorklasse

Von den in DFD beschriebenen Prozessen sind "1. Kombinieren von Produktmastern" und "2. Aggregation nach Kategorie" als Operatoren implementiert. "1. Kombinieren von Produktmastern" ist die Methode "joinItemInfo" ([Operator "MasterJoinUpdate") (http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-join- Es wird vom Update-Operator implementiert)). Da der Produktcode allein keine äquivalente Kombination ergibt, legen Sie die Hauptauswahlbedingung fest ([Hilfsoperator "MasterSelection") (http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master- " Es wird vom Auswahl-Support-Operator hinzugefügt. "2. Aggregation nach Kategorie" ist die Methode "summarizeByCategory" ([Fold-Operator]) (http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#fold-operator). ) Ist implementiert. Die Menge und der Verkaufsbetrag werden beim Falten mit dem Kategoriecode als Schlüssel summiert.

src/main/java/com/example/operator/SummarizeSalesOperator.java


package com.example.operator;

import java.util.List;

import com.asakusafw.runtime.value.Date;
import com.asakusafw.runtime.value.DateTime;
import com.asakusafw.runtime.value.DateUtil;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.Fold;
import com.asakusafw.vocabulary.operator.MasterJoinUpdate;
import com.asakusafw.vocabulary.operator.MasterSelection;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;

public abstract class SummarizeSalesOperator {
    private final Date dateBuffer = new Date();

    @MasterSelection
    public ItemInfo selectAvailableItem(List<ItemInfo> candidates, CategorySummary sales) {
        DateTime dateTime = sales.getSalesDateTime();
        dateBuffer.setElapsedDays(DateUtil.getDayFromDate(
                dateTime.getYear(), dateTime.getMonth(), dateTime.getDay()));
        for (ItemInfo item : candidates) {
            if (item.getBeginDate().compareTo(dateBuffer) <= 0
                    && dateBuffer.compareTo(item.getEndDate()) <= 0) {
                return item;
            }
        }
        return null;
    }

    @MasterJoinUpdate(selection = "selectAvailableItem")
    public void joinItemInfo(
            @Key(group = "item_code") ItemInfo info,
            @Key(group = "item_code") CategorySummary sales) {
        sales.setCategoryCodeOption(info.getCategoryCodeOption());
        sales.setCategoryNameOption(info.getCategoryNameOption());
        sales.setSellingPrice(sales.getAmount() * info.getUnitPrice());
    }

    @Fold
    public void summarizeByCategory(@Key(group = "category_code") CategorySummary left, CategorySummary right) {
        left.setAmount(left.getAmount() + right.getAmount());
        left.setSellingPrice(left.getSellingPrice() + right.getSellingPrice());
    }
}

Erstellen einer Jobflussklasse

Geben Sie gemäß DFD-Design "Verkaufsdetails ( Verkäufe) "und" Produktstamm (Artikel)" ein und stellen Sie eine Verbindung zu "1. Produktstamm ( joinItemInfo) kombinieren "" 2. Nach Kategorie aggregieren (summarizeByCategory)" Implementieren, um das Verarbeitungsergebnis an "CategorySummary`" auszugeben.

src/main/example/jobflow/SummarizeSalesJob.java


package com.example.jobflow;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
import com.example.modelgen.dmdl.model.SalesDetail;
import com.example.operator.SummarizeSalesOperatorFactory;
import com.example.operator.SummarizeSalesOperatorFactory.JoinItemInfo;
import com.example.operator.SummarizeSalesOperatorFactory.SummarizeByCategory;

@JobFlow(name = "summarizeSalesJob")
public class SummarizeSalesJob extends FlowDescription {
    final In<SalesDetail> sales;
    final In<ItemInfo> item;
    final Out<CategorySummary> categorySummary;
    public SummarizeSalesJob(
            @Import(name = "sales", description = SalesDetailFromJDBC.class)
            In<SalesDetail> sales,
            @Import(name = "item", description = ItemInfoFromJDBC.class)
            In<ItemInfo> item,
            @Export(name = "result", description = CategorySummaryToJDBC.class)
            Out<CategorySummary> categorySummary) {
        this.sales = sales;
        this.item = item;
        this.categorySummary = categorySummary;
    }

    @Override
    protected void describe() {
        CoreOperatorFactory core = new CoreOperatorFactory();
        SummarizeSalesOperatorFactory operator = new SummarizeSalesOperatorFactory();

        JoinItemInfo joinedItem
                = operator.joinItemInfo(item, core.restructure(sales, CategorySummary.class));
        core.stop(joinedItem.missed);
        SummarizeByCategory summarized = operator.summarizeByCategory(joinedItem.updated);
        categorySummary.add(summarized.out);

Erstellen einer Stapelklasse

Implementieren Sie eine Stapelklasse, die den Jobfluss ausführt (SummarizeSalesJob).

src/main/java/com/example/batch/SummarizeBatch.java


package com.example.batch;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SummarizeSalesJob;

@Batch(name = "example.summarizeSales")
public class SummarizeBatch extends BatchDescription {

    @Override
    protected void describe() {
        run(SummarizeSalesJob.class).soon();
    }
}

Erstellen und Bereitstellen einer Anwendung

Installieren Sie die für die M3BP-Build-Umgebung erforderlichen Pakete im Voraus.

sudo yum -y install cmake
sudo yum -y install make
sudo yum -y install gcc-c++
sudo yum -y install hwloc

Setzen Sie die Umgebungsvariable ASAKUSA_HOME in .bash_profile usw. wie im folgenden Beispiel gezeigt.

.bash_profile


export ASAKUSA_HOME=$HOME/asakusa

Führen Sie den Befehl "gradle assemble" aus dem Projektordner aus, um eine Bereitstellungsarchivdatei für M3BP zu erstellen. Erweitern Sie die erstellte Datei im Pfad der Umgebungsvariablen "ASAKUSA_HOME" und führen Sie den Befehl "setup.jar" aus.

gradle assemble
rm -rf $ASAKUSA_HOME
mkdir -p $ASAKUSA_HOME
cp ./build/asakusafw-*.tar.gz $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xzf asakusafw-*.tar.gz
java -jar $ASAKUSA_HOME/tools/bin/setup.jar

Führen Sie die Anwendung aus

Führen Sie die Anwendung auf M3BP mit der Stapel-ID als Argument in [YAESS] aus (http://docs.asakusafw.com/latest/release/ja/html/yaess/index.html). Geben Sie "2017-04-01" für den Stapelparameter "DATE" an. Gemäß den in "SalesDetailFromJDBC.java" der Eingabedefinitionsklasse festgelegten Bedingungen werden die Verkaufsdetaildaten des im Stapelargument angegebenen Datums verarbeitet.

$ASAKUSA_HOME/yaess/bin/yaess-batch.sh m3bp.example.summarizeSales -A DATE=2017-04-01

Das Ausführungsergebnis wird in der Tabelle CATEGORY_SUMMARY registriert.

mysql -u demo -p demo -e 'select * from CATEGORY_SUMMARY';
Enter password: 
+---------------+--------------------------+--------+---------------+
| CATEGORY_CODE | CATEGORY_NAME            | AMOUNT | SELLING_PRICE |
+---------------+--------------------------+--------+---------------+
| 1600          |Praline|     11 |          2220 |
| 1401          |Cup Nudel|      5 |           490 |
+---------------+--------------------------+--------+---------------+

Schließlich

Im vorherigen Artikel (Hello, World!), Vanilla unter Windows und im vorherigen Artikel (Asakusa Framework und Impala sind mit BI-Tools verknüpft) Visualize) hat Asakusas Ausführungs-Engine für Spark on Hadoop eingeführt, diesmal M3BP. Wie Sie den bisherigen Artikeln entnehmen können, muss der Quellcode nicht geändert werden, und Sie können die Ausführungs-Engine einfach durch Ändern der Plug-In-Einstellungen von build.gradle ändern. Wir hoffen, dass dies hilfreich ist, wenn Sie überlegen, welche Art von Architektur für Ihre Datengröße am besten geeignet ist.

Recommended Posts

Verbinde Asakusa mit M3 BP und DB
Linkverarbeitung und SQLite