Dans l'article précédent Integrating Asakusa Framework and Impala to visualize with BI tools, traitement de données à grande échelle sur Hadoop Introduction de la méthode de mise en œuvre de l'architecture en supposant. Cette fois, nous présenterons la méthode d'implémentation pour relier les données commerciales de la base de données à Asakusa sur M3BP. .. La figure ci-dessous résume les caractéristiques de l'architecture.
L'exemple d'application entre le tableau "Détails des ventes" et le tableau "Produit principal" de la base de données, agrège le montant des ventes pour chaque catégorie et le sort dans le tableau "Agrégation des ventes par catégorie".
Traitement de "1. Combinaison de fiches produits"
Traitement de "2. Agrégation par catégorie" Additionner la quantité et le montant des ventes en utilisant le code de catégorie comme clé
Les définitions des tables d'entrée et de sortie sont les suivantes:
article | Moule | PK |
---|---|---|
Date et heure de vente (SALES_DATE_TIME) | DATETIME | 〇 |
Code produit (ITEM_CODE) | VARCHAR(13) | 〇 |
Quantité (AMOUNT) | INT |
article | Moule | PK |
---|---|---|
Code produit (ITEM_CODE) | VARCHAR(13) | 〇 |
Nom du produit (ITEM_NAME) | VARCHAR(128) | |
Code de catégorie (CATÉGORIE_CODE) | CHAR(4) | |
Nom de la catégorie (CATÉGORIE_NAME) | CHAR(128) | |
Prix unitaire (UNIT_PRICE) | INT | |
Date d'enregistrement principal (ENREGISTRÉ)_DATE) | DATE | |
Date de début de la demande principale (BEGIN)_DATE) | DATE | 〇 |
Date de fin de la demande principale (FIN_DATE) | DATE |
article | Moule | PK |
---|---|---|
Code de catégorie (CATÉGORIE_CODE) | CHAR(4) | |
Nom de la catégorie (CATÉGORIE_NAME) | CHAR(128) | |
Quantité (AMOUNT) | INT | |
Montant des ventes (VENTE_PRICE) | INT |
Nous avons confirmé l'opération dans l'environnement suivant.
Comme travail préliminaire, le logiciel ci-dessus (MySQL
, JDK
, Gradle
) doit être installé sur CentOS.
Le code source peut être trouvé sur GitHub.
demo.sql
CREATE DATABASE demo;
CREATE USER demo@'%' IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@'%';
CREATE USER demo@localhost IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@localhost;
CREATE TABLE demo.SALES_DETAIL
(
SALES_DATE_TIME DATETIME NOT NULL COMMENT 'Date et heure de vente',
ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Code produit',
AMOUNT INT NOT NULL COMMENT 'quantité',
PRIMARY KEY(SALES_DATE_TIME, ITEM_CODE)
) COMMENT = 'Détails des ventes';
CREATE TABLE demo.ITEM_INFO
(
ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Code produit',
ITEM_NAME VARCHAR(128) COMMENT 'Nom du produit',
CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Code de catégorie',
CATEGORY_NAME VARCHAR(128) COMMENT 'Nom de catégorie',
UNIT_PRICE INT NOT NULL COMMENT 'Prix unitaire',
REGISTERED_DATE DATE NOT NULL COMMENT 'Date d'enregistrement principal',
BEGIN_DATE DATE NOT NULL COMMENT 'Date de début de la demande principale',
END_DATE DATE NOT NULL COMMENT 'Date de fin de la demande principale',
PRIMARY KEY(ITEM_CODE, BEGIN_DATE)
) COMMENT = 'Maître de produit';
CREATE TABLE demo.CATEGORY_SUMMARY
(
CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Code de catégorie',
CATEGORY_NAME VARCHAR(128) COMMENT 'Nom de catégorie',
AMOUNT INT NOT NULL COMMENT 'quantité',
SELLING_PRICE INT NOT NULL COMMENT 'Montant des ventes'
) COMMENT = 'Ventes totales par catégorie';
INSERT INTO SALES_DETAIL VALUES ('2017-03-31 23:59:59','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:30:00','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:31:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:32:00','4922010001000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:33:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:35:00','4922020002000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:36:00','4922020002001',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:38:00','4922020002000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:39:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:41:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:42:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-02 10:10:00','4922020002002',2);
INSERT INTO ITEM_INFO VALUES ('4922010001000','Chocolat au lait M','1600','bonbons au chocolat',120,'2017-04-01','2017-04-01','2018-01-19');
INSERT INTO ITEM_INFO VALUES ('4922010001000','Chocolat au lait M','1600','bonbons au chocolat',130,'2018-01-20','2018-01-20','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001001','Chocolat assorti PREMIUM','1600','bonbons au chocolat',330,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001002','Mini croquant aux amandes','1600','bonbons au chocolat',140,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002000','Coupe de sauce soja aux nouilles','1401','Coupe de nouilles',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002001','Coupe de sel de nouilles','1401','Coupe de nouilles',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002002','Coupe de nouilles au curry','1401','Coupe de nouilles',120,'2017-04-01','2017-04-01','2019-12-31');
commit;
Reflète le fichier SQL créé.
mysql -u demo -p demo < demo.sql
Créez un dossier de projet. Dans cet exemple, nous travaillerons sur les dossiers suivants.
mkdir asakusa-example-windgate
Créez un fichier de script Gradle sous le dossier du projet. Les paramètres suivants sont principalement ajoutés.
Pour plus d'informations sur les scripts Gradle, reportez-vous à la référence du plug-in Asakusa Gradle (http://docs.asakusafw.com/latest/release/ja/html/application/gradle-plugin-reference.html).
build.gradle
group 'com.example'
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
}
dependencies {
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-m3bp'
apply plugin: 'eclipse'
asakusafw {
m3bp {
option 'm3bp.native.cmake.CMAKE_TOOLCHAIN_FILE', System.getProperty('m3bp.toolchain')
option 'windgate.jdbc.direct', '*'
}
}
asakusafwOrganizer {
profiles.prod {
hadoop.embed true
assembly.into('.') {
put 'src/dist/prod'
}
}
extension {
libraries += ['mysql:mysql-connector-java:5.1.45']
}
}
Sous le dossier du projet, exécutez la commande Gradle ci-dessous pour l'importer en tant que projet Eclipse. Si vous souhaitez utiliser IntelliJ IDEA, veuillez vous reporter à Utilisation d'IntelliJ officiel. Si vous utilisez Eclipse, pensez à utiliser Shafu (plug-in Eclipse pour le développement Asakusa).
gradle eclipse
Les paramètres de M3BP sont décrits dans le fichier m3bp.properties
. Cette section décrit les paramètres liés au pilote JDBC pour WindGate JDBC Direct Mode.
«Exemple» inclus dans la clé de propriété suivante correspond à «profileName» défini dans la classe de définition d'entrée / sortie.
Pour plus d'informations, consultez Asakusa sur les paramètres d'optimisation M3BP (http://docs.asakusafw.com/latest/release/ja/html/m3bp/optimization.html).
src/dist/prod/m3bp/conf/m3bp.properties
com.asakusafw.dag.jdbc.example.url=jdbc:mysql://localhost/demo
com.asakusafw.dag.jdbc.example.driver=com.mysql.jdbc.Driver
com.asakusafw.dag.jdbc.example.properties.user=demo
com.asakusafw.dag.jdbc.example.properties.password=demo
DMDL (Data Model Definition Language) Créez un fichier de script.
Le modèle d'entrée «détails des ventes» et «produit de base» et le modèle de sortie «agrégation des ventes par catégorie» sont définis.
Spécifiez @ windgate.jdbc.column
pour la propriété correspondant à la colonne dans la base de données.
Pour plus d'informations sur les paramètres Windgate, consultez Générer automatiquement DataModelJdbcSupport (http://docs.asakusafw.com/latest/release/ja/html/windgate/user-guide.html#datamodeljdbcsupport).
src/main/dmdl/models.dmdl
"Détails des ventes"
@windgate.jdbc.table(
name = "demo.SALES_DETAIL"
)
sales_detail = {
"Date et heure de vente"
@windgate.jdbc.column(name = "SALES_DATE_TIME")
sales_date_time : DATETIME;
"Code produit"
@windgate.jdbc.column(name = "ITEM_CODE")
item_code : TEXT;
"quantité"
@windgate.jdbc.column(name = "AMOUNT")
amount : INT;
};
"Maître de produit"
@windgate.jdbc.table(
name = "demo.ITEM_INFO"
)
item_info = {
"Code produit"
@windgate.jdbc.column(name = "ITEM_CODE")
item_code : TEXT;
"Nom du produit"
@windgate.jdbc.column(name = "ITEM_NAME")
item_name : TEXT;
"Code de catégorie de produit"
@windgate.jdbc.column(name = "CATEGORY_CODE")
category_code : TEXT;
"Nom de la catégorie de produit"
@windgate.jdbc.column(name = "CATEGORY_NAME")
category_name : TEXT;
"Prix unitaire"
@windgate.jdbc.column(name = "UNIT_PRICE")
unit_price : INT;
"Date d'enregistrement principal"
@windgate.jdbc.column(name = "REGISTERED_DATE")
registered_date : DATE;
"Date de début de la demande principale"
@windgate.jdbc.column(name = "BEGIN_DATE")
begin_date : DATE;
"Date de fin de la demande principale"
@windgate.jdbc.column(name = "END_DATE")
end_date : DATE;
};
"Ventes totales par catégorie"
@windgate.jdbc.table(
name = "demo.CATEGORY_SUMMARY"
)
category_summary = {
sales_date_time : DATETIME;
item_code : TEXT;
@windgate.jdbc.column(name = "CATEGORY_CODE")
category_code : TEXT;
@windgate.jdbc.column(name = "CATEGORY_NAME")
category_name : TEXT;
@windgate.jdbc.column(name = "AMOUNT")
amount : INT;
@windgate.jdbc.column(name = "SELLING_PRICE")
selling_price : INT;
};
Lorsque vous exécutez la commande Gradle suivante, une classe de modèle de données pouvant être utilisée par Asakusa Framework est générée en fonction du fichier de script créé. (Exécuter sous le dossier du projet)
gradle compileDMDL
Dans chaque classe de définition d'entrée / sortie, définissez profileName
à associer aux paramètres JDBC définis dans le fichier m3bp.properties
.
De plus, la classe SalesDetailFromJDBC.java
associée à la table des articles de vente a une expression conditionnelle qui correspond à la clause WHERE de SQL. La valeur de l'argument au démarrage du lot est affectée à «$ {DATE}».
src/main/java/com/example/jobflow/SalesDetailFromJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractSalesDetailJdbcImporterDescription;
public class SalesDetailFromJDBC extends AbstractSalesDetailJdbcImporterDescription {
@Override
public String getProfileName() {
return "example";
}
@Override
public String getCondition() {
return "SALES_DATE_TIME between '${DATE} 00:00:00' and '${DATE} 23:59:59'";
}
}
src/main/java/com/example/jobflow/ItemInfoFromJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractItemInfoJdbcImporterDescription;
public class ItemInfoFromJDBC extends AbstractItemInfoJdbcImporterDescription {
@Override
public String getProfileName() {
return "example";
}
}
src/main/java/com/example/jobflow/CategorySummaryToJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractCategorySummaryJdbcExporterDescription;
public class CategorySummaryToJDBC extends AbstractCategorySummaryJdbcExporterDescription {
@Override
public String getProfileName() {
return "example";
}
}
Parmi les processus décrits dans DFD, «1. Combinaison de produits génériques» et «2. Agrégation par catégorie» sont mis en œuvre en tant qu'opérateurs.
"1. Combiner des produits maîtres" est la méthode joinItemInfo
([opérateur MasterJoinUpdate
](http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-join- Il est implémenté par update-operator)). Étant donné que le code produit seul ne donne pas une combinaison équivalente, définissez la condition de sélection principale ([opérateur auxiliaire MasterSelection
](http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-" Il est ajouté par selection-support-operator)).
«2. Agrégation par catégorie» est la méthode «summaryByCategory» ([opérateur «Fold») (http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#fold-operator) ) Est implémenté. La quantité et le montant des ventes sont totalisés lors du pliage en utilisant le code de catégorie comme clé.
src/main/java/com/example/operator/SummarizeSalesOperator.java
package com.example.operator;
import java.util.List;
import com.asakusafw.runtime.value.Date;
import com.asakusafw.runtime.value.DateTime;
import com.asakusafw.runtime.value.DateUtil;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.Fold;
import com.asakusafw.vocabulary.operator.MasterJoinUpdate;
import com.asakusafw.vocabulary.operator.MasterSelection;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
public abstract class SummarizeSalesOperator {
private final Date dateBuffer = new Date();
@MasterSelection
public ItemInfo selectAvailableItem(List<ItemInfo> candidates, CategorySummary sales) {
DateTime dateTime = sales.getSalesDateTime();
dateBuffer.setElapsedDays(DateUtil.getDayFromDate(
dateTime.getYear(), dateTime.getMonth(), dateTime.getDay()));
for (ItemInfo item : candidates) {
if (item.getBeginDate().compareTo(dateBuffer) <= 0
&& dateBuffer.compareTo(item.getEndDate()) <= 0) {
return item;
}
}
return null;
}
@MasterJoinUpdate(selection = "selectAvailableItem")
public void joinItemInfo(
@Key(group = "item_code") ItemInfo info,
@Key(group = "item_code") CategorySummary sales) {
sales.setCategoryCodeOption(info.getCategoryCodeOption());
sales.setCategoryNameOption(info.getCategoryNameOption());
sales.setSellingPrice(sales.getAmount() * info.getUnitPrice());
}
@Fold
public void summarizeByCategory(@Key(group = "category_code") CategorySummary left, CategorySummary right) {
left.setAmount(left.getAmount() + right.getAmount());
left.setSellingPrice(left.getSellingPrice() + right.getSellingPrice());
}
}
Selon la conception de DFD, entrez "Détails des ventes ( sales
) "et" Product master (ʻitem) "et connectez-vous à" 1. Combine product master (
joinItemInfo)" "2. Agréger par catégorie (
summaryByCategory)" Implémente pour afficher le résultat du traitement dans "CategorySummary
".
src/main/example/jobflow/SummarizeSalesJob.java
package com.example.jobflow;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
import com.example.modelgen.dmdl.model.SalesDetail;
import com.example.operator.SummarizeSalesOperatorFactory;
import com.example.operator.SummarizeSalesOperatorFactory.JoinItemInfo;
import com.example.operator.SummarizeSalesOperatorFactory.SummarizeByCategory;
@JobFlow(name = "summarizeSalesJob")
public class SummarizeSalesJob extends FlowDescription {
final In<SalesDetail> sales;
final In<ItemInfo> item;
final Out<CategorySummary> categorySummary;
public SummarizeSalesJob(
@Import(name = "sales", description = SalesDetailFromJDBC.class)
In<SalesDetail> sales,
@Import(name = "item", description = ItemInfoFromJDBC.class)
In<ItemInfo> item,
@Export(name = "result", description = CategorySummaryToJDBC.class)
Out<CategorySummary> categorySummary) {
this.sales = sales;
this.item = item;
this.categorySummary = categorySummary;
}
@Override
protected void describe() {
CoreOperatorFactory core = new CoreOperatorFactory();
SummarizeSalesOperatorFactory operator = new SummarizeSalesOperatorFactory();
JoinItemInfo joinedItem
= operator.joinItemInfo(item, core.restructure(sales, CategorySummary.class));
core.stop(joinedItem.missed);
SummarizeByCategory summarized = operator.summarizeByCategory(joinedItem.updated);
categorySummary.add(summarized.out);
Implémentez une classe batch qui exécute le flux de travaux (SummarizeSalesJob
).
src/main/java/com/example/batch/SummarizeBatch.java
package com.example.batch;
import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SummarizeSalesJob;
@Batch(name = "example.summarizeSales")
public class SummarizeBatch extends BatchDescription {
@Override
protected void describe() {
run(SummarizeSalesJob.class).soon();
}
}
Installez à l'avance les packages requis pour l'environnement de construction M3BP.
sudo yum -y install cmake
sudo yum -y install make
sudo yum -y install gcc-c++
sudo yum -y install hwloc
Définissez la variable d'environnement ʻASAKUSA_HOME dans
.bash_profile` etc. comme indiqué dans l'exemple ci-dessous.
.bash_profile
export ASAKUSA_HOME=$HOME/asakusa
Exécutez la commande gradle assemble
à partir du dossier du projet pour créer un fichier d'archive de déploiement pour M3BP.
Développez le fichier créé sur le chemin de la variable d'environnement ʻASAKUSA_HOME et exécutez la commande
setup.jar`.
gradle assemble
rm -rf $ASAKUSA_HOME
mkdir -p $ASAKUSA_HOME
cp ./build/asakusafw-*.tar.gz $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xzf asakusafw-*.tar.gz
java -jar $ASAKUSA_HOME/tools/bin/setup.jar
Exécutez l'application sur M3BP avec l'ID de lot comme argument dans YAESS. Spécifiez «2017-04-01» pour le paramètre de lot «DATE».
Selon les conditions définies dans SalesDetailFromJDBC.java
de la classe de définition d'entrée, les données de détail des ventes de la date spécifiée dans l'argument de lot seront traitées.
$ASAKUSA_HOME/yaess/bin/yaess-batch.sh m3bp.example.summarizeSales -A DATE=2017-04-01
Le résultat de l'exécution est enregistré dans la table CATEGORY_SUMMARY
.
mysql -u demo -p demo -e 'select * from CATEGORY_SUMMARY';
Enter password:
+---------------+--------------------------+--------+---------------+
| CATEGORY_CODE | CATEGORY_NAME | AMOUNT | SELLING_PRICE |
+---------------+--------------------------+--------+---------------+
| 1600 |bonbons au chocolat| 11 | 2220 |
| 1401 |Coupe de nouilles| 5 | 490 |
+---------------+--------------------------+--------+---------------+
Dans l'article précédent (Hello, World!), Vanilla sur Windows, et dans l'article précédent (Asakusa Framework et Impala sont liés aux outils BI) Visualize) a introduit le moteur d'exécution d'Asakusa pour Spark sur Hadoop, cette fois M3BP. Comme vous pouvez le voir dans les articles jusqu'à présent, il n'est pas nécessaire de modifier le code source, et vous pouvez changer le moteur d'exécution simplement en changeant les paramètres du plug-in de build.gradle
.
Nous espérons que cela vous sera utile lors de l'examen du type d'architecture qui convient le mieux à la taille de vos données.