Link Asakusa on M3 BP and DB

Introduction

In the previous article Integrating Asakusa Framework and Impala to visualize with BI tools, processing large-scale data on Hadoop. Introduced the implementation method of the architecture assuming. This time, we will introduce the implementation method for linking business data on the database to Asakusa on M3BP. .. The figure below summarizes the features of the architecture.

m3bp_windgate.png

What you want to do in this article

The sample application inputs the "Sales Details" table and the "Product Master" table from the DB, aggregates the sales amount for each category, and outputs it to the "Sales Aggregation by Category" table.

Processing of "1. Commodity master combination"

Processing of "2. Aggregation by category" Sum the quantity and sales amount using the category code as a key

summarizeSales_dfd.png

The I / O table definitions are as follows:

item Mold PK
Sales date and time (SALES_DATE_TIME) DATETIME
Product code (ITEM_CODE) VARCHAR(13)
Quantity (AMOUNT) INT
item Mold PK
Product code (ITEM_CODE) VARCHAR(13)
Product name (ITEM_NAME) VARCHAR(128)
Category code (CATEGORY_CODE) CHAR(4)
Category name (CATEGORY_NAME) CHAR(128)
Unit price (UNIT_PRICE) INT
Master registration date (REGISTERED)_DATE) DATE
Master application start date (BEGIN)_DATE) DATE
Master application end date (END_DATE) DATE
item Mold PK
Category code (CATEGORY_CODE) CHAR(4)
Category name (CATEGORY_NAME) CHAR(128)
Quantity (AMOUNT) INT
Sales amount (SELLING_PRICE) INT

Operating environment

We have confirmed the operation in the following environment.

As a preliminary work, the above software (MySQL, JDK, Gradle) must be installed on CentOS.

The source code is on GitHub.

DB data preparation

demo.sql


CREATE DATABASE demo;
CREATE USER demo@'%' IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@'%';
CREATE USER demo@localhost IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@localhost;

CREATE TABLE demo.SALES_DETAIL
(
	SALES_DATE_TIME DATETIME NOT NULL COMMENT 'Sales date and time',
	ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Product code',
	AMOUNT INT NOT NULL COMMENT 'quantity',
	PRIMARY KEY(SALES_DATE_TIME, ITEM_CODE)
) COMMENT = 'Sales details';

CREATE TABLE demo.ITEM_INFO
(
	ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Product code',
	ITEM_NAME VARCHAR(128) COMMENT 'Product name',
	CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Category code',
	CATEGORY_NAME VARCHAR(128) COMMENT 'Category name',
	UNIT_PRICE INT NOT NULL COMMENT 'unit price',
	REGISTERED_DATE DATE NOT NULL COMMENT 'Master registration date',
	BEGIN_DATE DATE NOT NULL COMMENT 'Master application start date',
	END_DATE DATE NOT NULL COMMENT 'Master application end date',
	PRIMARY KEY(ITEM_CODE, BEGIN_DATE)
) COMMENT = 'Product master';

CREATE TABLE demo.CATEGORY_SUMMARY
(
	CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Category code',
	CATEGORY_NAME VARCHAR(128) COMMENT 'Category name',
	AMOUNT INT NOT NULL COMMENT 'quantity',
	SELLING_PRICE INT NOT NULL COMMENT 'Sales amount'
) COMMENT = 'Total sales by category';

INSERT INTO SALES_DETAIL VALUES ('2017-03-31 23:59:59','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:30:00','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:31:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:32:00','4922010001000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:33:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:35:00','4922020002000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:36:00','4922020002001',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:38:00','4922020002000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:39:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:41:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:42:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-02 10:10:00','4922020002002',2);

INSERT INTO ITEM_INFO VALUES ('4922010001000','Milk chocolate M','1600','chocolate candy',120,'2017-04-01','2017-04-01','2018-01-19');
INSERT INTO ITEM_INFO VALUES ('4922010001000','Milk chocolate M','1600','chocolate candy',130,'2018-01-20','2018-01-20','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001001','PREMIUM Assorted Chocolate','1600','chocolate candy',330,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001002','Almond crunch mini','1600','chocolate candy',140,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002000','Cup noodle soy sauce','1401','Cup Noodle',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002001','Cup noodle salt','1401','Cup Noodle',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002002','Cup noodle curry','1401','Cup Noodle',120,'2017-04-01','2017-04-01','2019-12-31');

commit;

Reflects the created SQL file.

mysql -u demo -p demo < demo.sql

Development environment preparation

Create a project folder. In this sample, we will work on the following folders.

mkdir asakusa-example-windgate

Create a Gradle script file under the project folder. The following settings are mainly added.

For more information on Gradle scripts, see Asakusa Gradle Plugin Reference (http://docs.asakusafw.com/latest/release/ja/html/application/gradle-plugin-reference.html).

build.gradle


group 'com.example'

buildscript {
    repositories {
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
    }
    dependencies {
        classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
    }
}

apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-m3bp'
apply plugin: 'eclipse'

asakusafw {
    m3bp {
        option 'm3bp.native.cmake.CMAKE_TOOLCHAIN_FILE', System.getProperty('m3bp.toolchain')
        option 'windgate.jdbc.direct', '*'
    }
}

asakusafwOrganizer {
    profiles.prod {
        hadoop.embed true
        assembly.into('.') {
            put 'src/dist/prod'
        }
    }
    extension {
        libraries += ['mysql:mysql-connector-java:5.1.45']
    }
}

Under the project folder, execute the following Gradle command to import it as an Eclipse project. If you want to use IntelliJ IDEA, please refer to Using Official IntelliJ. If you use Eclipse, please consider using Shafu (Eclipse plug-in for Asakusa development).

gradle eclipse

M3BP settings

M3BP settings are described in the m3bp.properties file. This section describes the settings related to the JDBC driver for WindGate JDBC Direct Mode. ʻExampleincluded in the following property key corresponds to theprofileName` set in the I / O definition class. For more information, see Asakusa on M3BP Optimization Settings.

src/dist/prod/m3bp/conf/m3bp.properties


com.asakusafw.dag.jdbc.example.url=jdbc:mysql://localhost/demo
com.asakusafw.dag.jdbc.example.driver=com.mysql.jdbc.Driver
com.asakusafw.dag.jdbc.example.properties.user=demo
com.asakusafw.dag.jdbc.example.properties.password=demo

Data model class generation

DMDL (Data Model Definition Language) Create a script file. The input model "Sales details" and "Product master" and the output model "Sales summary by category" are defined. Specify @ windgate.jdbc.column for the property corresponding to the database column. For more information on Windgate settings, see Automatically generate DataModelJdbcSupport.

src/main/dmdl/models.dmdl


"Sales details"
@windgate.jdbc.table(
    name = "demo.SALES_DETAIL"
)
sales_detail = {

    "Sales date and time"
    @windgate.jdbc.column(name = "SALES_DATE_TIME")
    sales_date_time : DATETIME;

    "Product code"
    @windgate.jdbc.column(name = "ITEM_CODE")
    item_code : TEXT;

    "quantity"
    @windgate.jdbc.column(name = "AMOUNT")
    amount : INT;
};

"Product master"
@windgate.jdbc.table(
    name = "demo.ITEM_INFO"
)
item_info = {

    "Product code"
    @windgate.jdbc.column(name = "ITEM_CODE")
    item_code : TEXT;

    "Product name"
    @windgate.jdbc.column(name = "ITEM_NAME")
    item_name : TEXT;

    "Product category code"
    @windgate.jdbc.column(name = "CATEGORY_CODE")
    category_code : TEXT;

    "Product category name"
    @windgate.jdbc.column(name = "CATEGORY_NAME")
    category_name : TEXT;

    "unit price"
    @windgate.jdbc.column(name = "UNIT_PRICE")
    unit_price : INT;

    "Master registration date"
    @windgate.jdbc.column(name = "REGISTERED_DATE")
    registered_date : DATE;

    "Master application start date"
    @windgate.jdbc.column(name = "BEGIN_DATE")
    begin_date : DATE;

    "Master application end date"
    @windgate.jdbc.column(name = "END_DATE")
    end_date : DATE;
};

"Total sales by category"
@windgate.jdbc.table(
    name = "demo.CATEGORY_SUMMARY"
)
category_summary = {

    sales_date_time : DATETIME;

    item_code : TEXT;

    @windgate.jdbc.column(name = "CATEGORY_CODE")
    category_code : TEXT;

    @windgate.jdbc.column(name = "CATEGORY_NAME")
    category_name : TEXT;

    @windgate.jdbc.column(name = "AMOUNT")
    amount : INT;

    @windgate.jdbc.column(name = "SELLING_PRICE")
    selling_price : INT;
};

When you execute the following Gradle command, the data model class that can be used in Asakusa Framework will be generated based on the created script file. (Run under the project folder)

gradle compileDMDL

I / O definition class

In each I / O definition class, set profileName to associate with the JDBC settings defined in the m3bp.properties file. In addition, the conditional expression corresponding to the WHERE clause of SQL is added to the SalesDetailFromJDBC.java class related to the sales item table. The value of the argument at batch startup is assigned to $ {DATE}.

src/main/java/com/example/jobflow/SalesDetailFromJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractSalesDetailJdbcImporterDescription;

public class SalesDetailFromJDBC extends AbstractSalesDetailJdbcImporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }

    @Override
    public String getCondition() {
        return "SALES_DATE_TIME between '${DATE} 00:00:00' and '${DATE} 23:59:59'";
    }
}

src/main/java/com/example/jobflow/ItemInfoFromJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractItemInfoJdbcImporterDescription;

public class ItemInfoFromJDBC extends AbstractItemInfoJdbcImporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }
}

src/main/java/com/example/jobflow/CategorySummaryToJDBC.java


package com.example.jobflow;

import com.example.modelgen.dmdl.jdbc.AbstractCategorySummaryJdbcExporterDescription;

public class CategorySummaryToJDBC extends AbstractCategorySummaryJdbcExporterDescription {

    @Override
    public String getProfileName() {
        return "example";
    }
}

Creating an operator class

Of the processes described in the DFD, "1. Commodity master join" and "2. Aggregation by category" are implemented as operators. "1. Commodity master join" is the joinItemInfo method ([ MasterJoinUpdate operator](http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-join- It is implemented by update-operator)). Since the product code alone does not result in an equivalent combination, set the master selection condition ([MasterSelection auxiliary operator](http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-" It is added by selection-support-operator)). "2. Aggregation by category" is the summarizeByCategory method ( Fold operator ) Is implemented. The quantity and sales amount are totaled while convolving with the category code as the key.

src/main/java/com/example/operator/SummarizeSalesOperator.java


package com.example.operator;

import java.util.List;

import com.asakusafw.runtime.value.Date;
import com.asakusafw.runtime.value.DateTime;
import com.asakusafw.runtime.value.DateUtil;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.Fold;
import com.asakusafw.vocabulary.operator.MasterJoinUpdate;
import com.asakusafw.vocabulary.operator.MasterSelection;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;

public abstract class SummarizeSalesOperator {
    private final Date dateBuffer = new Date();

    @MasterSelection
    public ItemInfo selectAvailableItem(List<ItemInfo> candidates, CategorySummary sales) {
        DateTime dateTime = sales.getSalesDateTime();
        dateBuffer.setElapsedDays(DateUtil.getDayFromDate(
                dateTime.getYear(), dateTime.getMonth(), dateTime.getDay()));
        for (ItemInfo item : candidates) {
            if (item.getBeginDate().compareTo(dateBuffer) <= 0
                    && dateBuffer.compareTo(item.getEndDate()) <= 0) {
                return item;
            }
        }
        return null;
    }

    @MasterJoinUpdate(selection = "selectAvailableItem")
    public void joinItemInfo(
            @Key(group = "item_code") ItemInfo info,
            @Key(group = "item_code") CategorySummary sales) {
        sales.setCategoryCodeOption(info.getCategoryCodeOption());
        sales.setCategoryNameOption(info.getCategoryNameOption());
        sales.setSellingPrice(sales.getAmount() * info.getUnitPrice());
    }

    @Fold
    public void summarizeByCategory(@Key(group = "category_code") CategorySummary left, CategorySummary right) {
        left.setAmount(left.getAmount() + right.getAmount());
        left.setSellingPrice(left.getSellingPrice() + right.getSellingPrice());
    }
}

Creating a job flow class

According to the design of DFD, enter "Sales details ( sales) "and" Product master (ʻitem)" and connect to "1. Product master join ( joinItemInfo) "" 2. Aggregate by category. (summarizeByCategory)" Implement to output the processing result to "CategorySummary".

src/main/example/jobflow/SummarizeSalesJob.java


package com.example.jobflow;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
import com.example.modelgen.dmdl.model.SalesDetail;
import com.example.operator.SummarizeSalesOperatorFactory;
import com.example.operator.SummarizeSalesOperatorFactory.JoinItemInfo;
import com.example.operator.SummarizeSalesOperatorFactory.SummarizeByCategory;

@JobFlow(name = "summarizeSalesJob")
public class SummarizeSalesJob extends FlowDescription {
    final In<SalesDetail> sales;
    final In<ItemInfo> item;
    final Out<CategorySummary> categorySummary;
    public SummarizeSalesJob(
            @Import(name = "sales", description = SalesDetailFromJDBC.class)
            In<SalesDetail> sales,
            @Import(name = "item", description = ItemInfoFromJDBC.class)
            In<ItemInfo> item,
            @Export(name = "result", description = CategorySummaryToJDBC.class)
            Out<CategorySummary> categorySummary) {
        this.sales = sales;
        this.item = item;
        this.categorySummary = categorySummary;
    }

    @Override
    protected void describe() {
        CoreOperatorFactory core = new CoreOperatorFactory();
        SummarizeSalesOperatorFactory operator = new SummarizeSalesOperatorFactory();

        JoinItemInfo joinedItem
                = operator.joinItemInfo(item, core.restructure(sales, CategorySummary.class));
        core.stop(joinedItem.missed);
        SummarizeByCategory summarized = operator.summarizeByCategory(joinedItem.updated);
        categorySummary.add(summarized.out);

Creating a batch class

Implement a batch class that executes the job flow (SummarizeSalesJob).

src/main/java/com/example/batch/SummarizeBatch.java


package com.example.batch;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SummarizeSalesJob;

@Batch(name = "example.summarizeSales")
public class SummarizeBatch extends BatchDescription {

    @Override
    protected void describe() {
        run(SummarizeSalesJob.class).soon();
    }
}

Build and deploy application

Install the necessary packages for the M3BP build environment in advance.

sudo yum -y install cmake
sudo yum -y install make
sudo yum -y install gcc-c++
sudo yum -y install hwloc

Set the environment variable ʻASAKUSA_HOME in .bash_profile` etc. as shown in the example below.

.bash_profile


export ASAKUSA_HOME=$HOME/asakusa

Run the gradle assemble command from the project folder to create a deployment archive file for M3BP. Expand the created file on the path of the ʻASAKUSA_HOMEenvironment variable and execute thesetup.jar` command.

gradle assemble
rm -rf $ASAKUSA_HOME
mkdir -p $ASAKUSA_HOME
cp ./build/asakusafw-*.tar.gz $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xzf asakusafw-*.tar.gz
java -jar $ASAKUSA_HOME/tools/bin/setup.jar

Run application

Execute the application on M3BP by specifying the batch ID as an argument in YAESS. Specify 2017-04-01 for the batch parameter DATE. According to the conditions set in SalesDetailFromJDBC.java of the input definition class, the sales detail data of the date specified in the batch argument will be processed.

$ASAKUSA_HOME/yaess/bin/yaess-batch.sh m3bp.example.summarizeSales -A DATE=2017-04-01

The execution result is registered in the CATEGORY_SUMMARY table.

mysql -u demo -p demo -e 'select * from CATEGORY_SUMMARY';
Enter password: 
+---------------+--------------------------+--------+---------------+
| CATEGORY_CODE | CATEGORY_NAME            | AMOUNT | SELLING_PRICE |
+---------------+--------------------------+--------+---------------+
| 1600          |chocolate candy|     11 |          2220 |
| 1401          |Cup Noodle|      5 |           490 |
+---------------+--------------------------+--------+---------------+

Finally

In the previous article (Hello, World!), Vanilla on Windows, and in the previous article (Asakusa Framework and Impala are linked with BI tools) Visualize) introduced Asakusa's execution engine for Spark on Hadoop, this time M3BP. As you can see from the articles so far, there is no need to modify the source code, and you can switch the execution engine just by changing the plugin settings of build.gradle. We hope this will be helpful when considering what kind of architecture is best for your data size.

Recommended Posts

Link Asakusa on M3 BP and DB
Link Processing and SQLite