Im vorherigen Artikel Integration von Asakusa Framework und Impala zur Visualisierung mit BI-Tools wurden umfangreiche Daten in Hadoop verarbeitet Einführung der Implementierungsmethode der Architektur unter der Annahme. Dieses Mal werde ich die Implementierungsmethode zum Verknüpfen von Geschäftsdaten in der Datenbank mit [Asakusa on M3BP] vorstellen (http://docs.asakusafw.com/latest/release/ja/html/m3bp/index.html). .. Die folgende Abbildung fasst die Merkmale der Architektur zusammen.
Die Beispielanwendung gibt die Tabelle "Verkaufsdetails" und die Tabelle "Produktstamm" aus der Datenbank ein, aggregiert den Verkaufsbetrag für jede Kategorie und gibt ihn in die Tabelle "Verkaufsaggregation nach Kategorie" aus.
Verarbeitung von "1. Produktmaster kombinieren"
Verarbeitung von "2. Aggregation nach Kategorie" Summieren Sie die Menge und den Verkaufsbetrag mit dem Kategoriecode als Schlüssel
Die Definitionen der Eingabe- und Ausgabetabelle lauten wie folgt:
Artikel | Schimmel | PK |
---|---|---|
Verkaufsdatum und -zeit (VERKAUF_DATE_TIME) | DATETIME | 〇 |
Produktcode (ARTIKEL_CODE) | VARCHAR(13) | 〇 |
Gesamtmenge) | INT |
Artikel | Schimmel | PK |
---|---|---|
Produktcode (ARTIKEL_CODE) | VARCHAR(13) | 〇 |
Produktname (ARTIKEL_NAME) | VARCHAR(128) | |
Kategoriecode (KATEGORIE_CODE) | CHAR(4) | |
Kategoriename (KATEGORIE_NAME) | CHAR(128) | |
Stückpreis (EINHEIT_PRICE) | INT | |
Master-Registrierungsdatum (REGISTRIERT)_DATE) | DATE | |
Startdatum der Master-Bewerbung (BEGIN)_DATE) | DATE | 〇 |
Enddatum der Master-Bewerbung (ENDE)_DATE) | DATE |
Artikel | Schimmel | PK |
---|---|---|
Kategoriecode (KATEGORIE_CODE) | CHAR(4) | |
Kategoriename (KATEGORIE_NAME) | CHAR(128) | |
Gesamtmenge) | INT | |
Verkaufsbetrag (VERKAUF_PRICE) | INT |
Wir haben den Betrieb in der folgenden Umgebung bestätigt.
Als Vorarbeit muss die oben genannte Software ("MySQL", "JDK", "Gradle") unter CentOS installiert werden.
Der Quellcode ist auf GitHub zu finden.
demo.sql
CREATE DATABASE demo;
CREATE USER demo@'%' IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@'%';
CREATE USER demo@localhost IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@localhost;
CREATE TABLE demo.SALES_DETAIL
(
SALES_DATE_TIME DATETIME NOT NULL COMMENT 'Verkaufsdatum und -zeit',
ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Produktcode',
AMOUNT INT NOT NULL COMMENT 'Menge',
PRIMARY KEY(SALES_DATE_TIME, ITEM_CODE)
) COMMENT = 'Verkaufsdetails';
CREATE TABLE demo.ITEM_INFO
(
ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Produktcode',
ITEM_NAME VARCHAR(128) COMMENT 'Produktname',
CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Kategoriecode',
CATEGORY_NAME VARCHAR(128) COMMENT 'Kategoriename',
UNIT_PRICE INT NOT NULL COMMENT 'Stückpreis',
REGISTERED_DATE DATE NOT NULL COMMENT 'Datum der Master-Registrierung',
BEGIN_DATE DATE NOT NULL COMMENT 'Startdatum der Master-Anwendung',
END_DATE DATE NOT NULL COMMENT 'Enddatum der Master-Bewerbung',
PRIMARY KEY(ITEM_CODE, BEGIN_DATE)
) COMMENT = 'Produktstamm';
CREATE TABLE demo.CATEGORY_SUMMARY
(
CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Kategoriecode',
CATEGORY_NAME VARCHAR(128) COMMENT 'Kategoriename',
AMOUNT INT NOT NULL COMMENT 'Menge',
SELLING_PRICE INT NOT NULL COMMENT 'Verkaufszahlen'
) COMMENT = 'Gesamtumsatz nach Kategorien';
INSERT INTO SALES_DETAIL VALUES ('2017-03-31 23:59:59','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:30:00','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:31:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:32:00','4922010001000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:33:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:35:00','4922020002000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:36:00','4922020002001',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:38:00','4922020002000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:39:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:41:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:42:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-02 10:10:00','4922020002002',2);
INSERT INTO ITEM_INFO VALUES ('4922010001000','Milchschokolade M.','1600','Praline',120,'2017-04-01','2017-04-01','2018-01-19');
INSERT INTO ITEM_INFO VALUES ('4922010001000','Milchschokolade M.','1600','Praline',130,'2018-01-20','2018-01-20','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001001','PREMIUM Verschiedene Schokolade','1600','Praline',330,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001002','Mandel Crunch Mini','1600','Praline',140,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002000','Tasse Nudelsojasauce','1401','Cup Nudel',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002001','Tasse Nudelsalz','1401','Cup Nudel',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002002','Cup Nudel Curry','1401','Cup Nudel',120,'2017-04-01','2017-04-01','2019-12-31');
commit;
Reflektiert die erstellte SQL-Datei.
mysql -u demo -p demo < demo.sql
Erstellen Sie einen Projektordner. In diesem Beispiel werden wir an den folgenden Ordnern arbeiten.
mkdir asakusa-example-windgate
Erstellen Sie eine Gradle-Skriptdatei unter dem Projektordner. Die folgenden Einstellungen werden hauptsächlich hinzugefügt.
Weitere Informationen zu Gradle-Skripten finden Sie in der Asakusa Gradle Plugin-Referenz (http://docs.asakusafw.com/latest/release/ja/html/application/gradle-plugin-reference.html).
build.gradle
group 'com.example'
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
}
dependencies {
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-m3bp'
apply plugin: 'eclipse'
asakusafw {
m3bp {
option 'm3bp.native.cmake.CMAKE_TOOLCHAIN_FILE', System.getProperty('m3bp.toolchain')
option 'windgate.jdbc.direct', '*'
}
}
asakusafwOrganizer {
profiles.prod {
hadoop.embed true
assembly.into('.') {
put 'src/dist/prod'
}
}
extension {
libraries += ['mysql:mysql-connector-java:5.1.45']
}
}
Führen Sie im Projektordner den folgenden Gradle-Befehl aus, um ihn als Eclipse-Projekt zu importieren. Wenn Sie IntelliJ IDEA verwenden möchten, lesen Sie bitte Using Official IntelliJ. Wenn Sie Eclipse verwenden, sollten Sie Shafu (Eclipse-Plug-In für die Asakusa-Entwicklung) verwenden.
gradle eclipse
Die Einstellungen für M3BP werden in der Datei "m3bp.properties" beschrieben. In diesem Abschnitt werden die Einstellungen für den JDBC-Treiber für WindGate JDBC Direct Mode beschrieben. Das im Eigenschaftsschlüssel unten enthaltene "Beispiel" entspricht dem in der Definitionsklasse für Eingabe / Ausgabe festgelegten "Profilname". Weitere Informationen finden Sie unter Asakusa zu den M3BP-Optimierungseinstellungen (http://docs.asakusafw.com/latest/release/ja/html/m3bp/optimization.html).
src/dist/prod/m3bp/conf/m3bp.properties
com.asakusafw.dag.jdbc.example.url=jdbc:mysql://localhost/demo
com.asakusafw.dag.jdbc.example.driver=com.mysql.jdbc.Driver
com.asakusafw.dag.jdbc.example.properties.user=demo
com.asakusafw.dag.jdbc.example.properties.password=demo
DMDL (Data Model Definition Language) Erstellen Sie eine Skriptdatei. Das Eingabemodell "Verkaufsdetails" und "Produktstamm" sowie das Ausgabemodell "Verkaufsaggregation nach Kategorie" sind definiert. Geben Sie "@ windgate.jdbc.column" für die Eigenschaft an, die der Spalte in der Datenbank entspricht. Weitere Informationen zu den Windgate-Einstellungen finden Sie unter Automatisches Generieren von DataModelJdbcSupport (http://docs.asakusafw.com/latest/release/ja/html/windgate/user-guide.html#datamodeljdbcsupport).
src/main/dmdl/models.dmdl
"Verkaufsdetails"
@windgate.jdbc.table(
name = "demo.SALES_DETAIL"
)
sales_detail = {
"Verkaufsdatum und -zeit"
@windgate.jdbc.column(name = "SALES_DATE_TIME")
sales_date_time : DATETIME;
"Produktcode"
@windgate.jdbc.column(name = "ITEM_CODE")
item_code : TEXT;
"Menge"
@windgate.jdbc.column(name = "AMOUNT")
amount : INT;
};
"Produktstamm"
@windgate.jdbc.table(
name = "demo.ITEM_INFO"
)
item_info = {
"Produktcode"
@windgate.jdbc.column(name = "ITEM_CODE")
item_code : TEXT;
"Produktname"
@windgate.jdbc.column(name = "ITEM_NAME")
item_name : TEXT;
"Produktkategoriecode"
@windgate.jdbc.column(name = "CATEGORY_CODE")
category_code : TEXT;
"Name der Produktkategorie"
@windgate.jdbc.column(name = "CATEGORY_NAME")
category_name : TEXT;
"Stückpreis"
@windgate.jdbc.column(name = "UNIT_PRICE")
unit_price : INT;
"Datum der Master-Registrierung"
@windgate.jdbc.column(name = "REGISTERED_DATE")
registered_date : DATE;
"Startdatum der Master-Anwendung"
@windgate.jdbc.column(name = "BEGIN_DATE")
begin_date : DATE;
"Enddatum der Master-Bewerbung"
@windgate.jdbc.column(name = "END_DATE")
end_date : DATE;
};
"Gesamtumsatz nach Kategorien"
@windgate.jdbc.table(
name = "demo.CATEGORY_SUMMARY"
)
category_summary = {
sales_date_time : DATETIME;
item_code : TEXT;
@windgate.jdbc.column(name = "CATEGORY_CODE")
category_code : TEXT;
@windgate.jdbc.column(name = "CATEGORY_NAME")
category_name : TEXT;
@windgate.jdbc.column(name = "AMOUNT")
amount : INT;
@windgate.jdbc.column(name = "SELLING_PRICE")
selling_price : INT;
};
Wenn Sie den folgenden Gradle-Befehl ausführen, wird basierend auf der erstellten Skriptdatei eine Datenmodellklasse generiert, die von Asakusa Framework verwendet werden kann. (Unter dem Projektordner ausführen)
gradle compileDMDL
Stellen Sie in jeder Eingabe- / Ausgabedefinitionsklasse profileName
so ein, dass es den in der Datei m3bp.properties
definierten JDBC-Einstellungen zugeordnet wird.
Darüber hinaus verfügt die Klasse "SalesDetailFromJDBC.java", die sich auf die Verkaufselementtabelle bezieht, über einen bedingten Ausdruck, der der WHERE-Klausel von SQL entspricht. Der Wert des Arguments beim Batch-Start wird "$ {DATE}" zugewiesen.
src/main/java/com/example/jobflow/SalesDetailFromJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractSalesDetailJdbcImporterDescription;
public class SalesDetailFromJDBC extends AbstractSalesDetailJdbcImporterDescription {
@Override
public String getProfileName() {
return "example";
}
@Override
public String getCondition() {
return "SALES_DATE_TIME between '${DATE} 00:00:00' and '${DATE} 23:59:59'";
}
}
src/main/java/com/example/jobflow/ItemInfoFromJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractItemInfoJdbcImporterDescription;
public class ItemInfoFromJDBC extends AbstractItemInfoJdbcImporterDescription {
@Override
public String getProfileName() {
return "example";
}
}
src/main/java/com/example/jobflow/CategorySummaryToJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractCategorySummaryJdbcExporterDescription;
public class CategorySummaryToJDBC extends AbstractCategorySummaryJdbcExporterDescription {
@Override
public String getProfileName() {
return "example";
}
}
Von den in DFD beschriebenen Prozessen sind "1. Kombinieren von Produktmastern" und "2. Aggregation nach Kategorie" als Operatoren implementiert.
"1. Kombinieren von Produktmastern" ist die Methode "joinItemInfo" ([Operator "MasterJoinUpdate") (http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-join- Es wird vom Update-Operator implementiert)). Da der Produktcode allein keine äquivalente Kombination ergibt, legen Sie die Hauptauswahlbedingung fest ([Hilfsoperator "MasterSelection") (http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master- " Es wird vom Auswahl-Support-Operator hinzugefügt.
"2. Aggregation nach Kategorie" ist die Methode "summarizeByCategory" ([Fold
-Operator]) (http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#fold-operator). ) Ist implementiert. Die Menge und der Verkaufsbetrag werden beim Falten mit dem Kategoriecode als Schlüssel summiert.
src/main/java/com/example/operator/SummarizeSalesOperator.java
package com.example.operator;
import java.util.List;
import com.asakusafw.runtime.value.Date;
import com.asakusafw.runtime.value.DateTime;
import com.asakusafw.runtime.value.DateUtil;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.Fold;
import com.asakusafw.vocabulary.operator.MasterJoinUpdate;
import com.asakusafw.vocabulary.operator.MasterSelection;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
public abstract class SummarizeSalesOperator {
private final Date dateBuffer = new Date();
@MasterSelection
public ItemInfo selectAvailableItem(List<ItemInfo> candidates, CategorySummary sales) {
DateTime dateTime = sales.getSalesDateTime();
dateBuffer.setElapsedDays(DateUtil.getDayFromDate(
dateTime.getYear(), dateTime.getMonth(), dateTime.getDay()));
for (ItemInfo item : candidates) {
if (item.getBeginDate().compareTo(dateBuffer) <= 0
&& dateBuffer.compareTo(item.getEndDate()) <= 0) {
return item;
}
}
return null;
}
@MasterJoinUpdate(selection = "selectAvailableItem")
public void joinItemInfo(
@Key(group = "item_code") ItemInfo info,
@Key(group = "item_code") CategorySummary sales) {
sales.setCategoryCodeOption(info.getCategoryCodeOption());
sales.setCategoryNameOption(info.getCategoryNameOption());
sales.setSellingPrice(sales.getAmount() * info.getUnitPrice());
}
@Fold
public void summarizeByCategory(@Key(group = "category_code") CategorySummary left, CategorySummary right) {
left.setAmount(left.getAmount() + right.getAmount());
left.setSellingPrice(left.getSellingPrice() + right.getSellingPrice());
}
}
Geben Sie gemäß DFD-Design "Verkaufsdetails ( Verkäufe
) "und" Produktstamm (Artikel
)" ein und stellen Sie eine Verbindung zu "1. Produktstamm ( joinItemInfo
) kombinieren "" 2. Nach Kategorie aggregieren (summarizeByCategory
)" Implementieren, um das Verarbeitungsergebnis an "CategorySummary`" auszugeben.
src/main/example/jobflow/SummarizeSalesJob.java
package com.example.jobflow;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
import com.example.modelgen.dmdl.model.SalesDetail;
import com.example.operator.SummarizeSalesOperatorFactory;
import com.example.operator.SummarizeSalesOperatorFactory.JoinItemInfo;
import com.example.operator.SummarizeSalesOperatorFactory.SummarizeByCategory;
@JobFlow(name = "summarizeSalesJob")
public class SummarizeSalesJob extends FlowDescription {
final In<SalesDetail> sales;
final In<ItemInfo> item;
final Out<CategorySummary> categorySummary;
public SummarizeSalesJob(
@Import(name = "sales", description = SalesDetailFromJDBC.class)
In<SalesDetail> sales,
@Import(name = "item", description = ItemInfoFromJDBC.class)
In<ItemInfo> item,
@Export(name = "result", description = CategorySummaryToJDBC.class)
Out<CategorySummary> categorySummary) {
this.sales = sales;
this.item = item;
this.categorySummary = categorySummary;
}
@Override
protected void describe() {
CoreOperatorFactory core = new CoreOperatorFactory();
SummarizeSalesOperatorFactory operator = new SummarizeSalesOperatorFactory();
JoinItemInfo joinedItem
= operator.joinItemInfo(item, core.restructure(sales, CategorySummary.class));
core.stop(joinedItem.missed);
SummarizeByCategory summarized = operator.summarizeByCategory(joinedItem.updated);
categorySummary.add(summarized.out);
Implementieren Sie eine Stapelklasse, die den Jobfluss ausführt (SummarizeSalesJob
).
src/main/java/com/example/batch/SummarizeBatch.java
package com.example.batch;
import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SummarizeSalesJob;
@Batch(name = "example.summarizeSales")
public class SummarizeBatch extends BatchDescription {
@Override
protected void describe() {
run(SummarizeSalesJob.class).soon();
}
}
Installieren Sie die für die M3BP-Build-Umgebung erforderlichen Pakete im Voraus.
sudo yum -y install cmake
sudo yum -y install make
sudo yum -y install gcc-c++
sudo yum -y install hwloc
Setzen Sie die Umgebungsvariable ASAKUSA_HOME
in .bash_profile
usw. wie im folgenden Beispiel gezeigt.
.bash_profile
export ASAKUSA_HOME=$HOME/asakusa
Führen Sie den Befehl "gradle assemble" aus dem Projektordner aus, um eine Bereitstellungsarchivdatei für M3BP zu erstellen. Erweitern Sie die erstellte Datei im Pfad der Umgebungsvariablen "ASAKUSA_HOME" und führen Sie den Befehl "setup.jar" aus.
gradle assemble
rm -rf $ASAKUSA_HOME
mkdir -p $ASAKUSA_HOME
cp ./build/asakusafw-*.tar.gz $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xzf asakusafw-*.tar.gz
java -jar $ASAKUSA_HOME/tools/bin/setup.jar
Führen Sie die Anwendung auf M3BP mit der Stapel-ID als Argument in [YAESS] aus (http://docs.asakusafw.com/latest/release/ja/html/yaess/index.html). Geben Sie "2017-04-01" für den Stapelparameter "DATE" an. Gemäß den in "SalesDetailFromJDBC.java" der Eingabedefinitionsklasse festgelegten Bedingungen werden die Verkaufsdetaildaten des im Stapelargument angegebenen Datums verarbeitet.
$ASAKUSA_HOME/yaess/bin/yaess-batch.sh m3bp.example.summarizeSales -A DATE=2017-04-01
Das Ausführungsergebnis wird in der Tabelle CATEGORY_SUMMARY
registriert.
mysql -u demo -p demo -e 'select * from CATEGORY_SUMMARY';
Enter password:
+---------------+--------------------------+--------+---------------+
| CATEGORY_CODE | CATEGORY_NAME | AMOUNT | SELLING_PRICE |
+---------------+--------------------------+--------+---------------+
| 1600 |Praline| 11 | 2220 |
| 1401 |Cup Nudel| 5 | 490 |
+---------------+--------------------------+--------+---------------+
Im vorherigen Artikel (Hello, World!), Vanilla unter Windows und im vorherigen Artikel (Asakusa Framework und Impala sind mit BI-Tools verknüpft) Visualize) hat Asakusas Ausführungs-Engine für Spark on Hadoop eingeführt, diesmal M3BP. Wie Sie den bisherigen Artikeln entnehmen können, muss der Quellcode nicht geändert werden, und Sie können die Ausführungs-Engine einfach durch Ändern der Plug-In-Einstellungen von build.gradle
ändern.
Wir hoffen, dass dies hilfreich ist, wenn Sie überlegen, welche Art von Architektur für Ihre Datengröße am besten geeignet ist.