In the previous article Hello, World! With Asakusa Framework!, Asakusa Frameowk I explained how to implement (/index.html). In this article, I would like to introduce the contents that are closer to the business scenario. In the figure below, various data are accumulated on Hadoop (HDFS) and processed with Asakusa on Spark. This is an example of a data analysis platform that visualizes the generated data with BI tools. The data scale is assumed to handle extremely large data such as tens of TB to several PB. In this article, we will introduce how to link these elemental technologies Asakusa on Spark and Impala as an example of SQL query engine.
It is assumed that the input file has already been deployed on Hadoop's HDFS. Originally, the input file is output after processing, but in this sample scenario, the input file is simply converted to Parquet format on HDFS and output. Define the output data as an Impala external table and refer to it from BI tools.
The sales data model looks like this: The ability to convert between Asakusa property data formats and Impala data formats is described in the DMDL definition.
data item | Asakusa data type | Impala data type |
---|---|---|
Sales date (sales_date) | DATE | TIMESTAMP |
Item code (item_code) | TEXT | STRING |
Sales quantity (amount) | INT | INT |
Sales amount (selling_price) | INT | INT |
We have confirmed the operation in the following environment.
Please note that Direct I / O Hive of Asakusa Framework 0.10 uses Hive1.2.2, so we have not verified all data compatibility between Hive versions.
The following should be completed as a preliminary work.
First of all, create a project folder. In this sample, we will work on the following folders.
mkdir asakusa-example-parquet
Create a Gradle script file on your project folder. This time, we have configured Asakusa on Spark Plugin to use Spark as the execution engine. For more information on the Gradle script you created, see the Asakusa Gradle Plugin (http://docs.asakusafw.com/latest/release/ja/html/application/gradle-plugin-reference.html) reference.
build.gradle
group 'com.example'
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
}
dependencies {
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'
asakusafw {
sdk.hive true
}
asakusafwOrganizer {
hive.enabled true
profiles.prod {
assembly.into('.') {
put 'src/dist/prod'
}
}
}
From the default Asakusa configuration file, the file system path (com.asakusafw.directio] in [Direct I / O](http://docs.asakusafw.com/latest/release/ja/html/directio/start-guide.html) .root.fs.path
) has been changed from target / testing / directio
to directio
.
With this setting, the Direct I / O root path will be hdfs: /// user / asakusa / directio
in this sample scenario.
src/dist/prod/core/conf/asakusa-resources.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>com.asakusafw.runtime.core.Report.Delegate</name>
<value>com.asakusafw.runtime.report.CommonsLoggingReport</value>
</property>
<property>
<name>com.asakusafw.directio.root</name>
<value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
<name>com.asakusafw.directio.root.path</name>
<value>/</value>
</property>
<property>
<name>com.asakusafw.directio.root.fs.path</name>
<value>directio</value>
</property>
</configuration>
DMDL DMDL (Data Model Definition Language) Create a script file to define the data model. This model assumes CSV format for input and Parquet format for output. When outputting in Parquet format, use the Asakusa Framework function called Direct I / O Hive. .. In addition to the Parquet format, it also supports the ORC format. The following sales_date property has the same DATE as the Asakusa data type in the Hive standard mapping, but since Impala does not have a type equivalent to the Hive DATE type, Direct I / O Hive's [Mapping Type Conversion Function](http: //docs.asakusafw.com/latest/release/ja/html/directio/using-hive.html#id19) is used to convert to TIMESTAMP type.
src/main/dmdl/sales.dmdl
@directio.csv
@directio.hive.parquet(
table_name = "sales"
)
sales = {
@directio.hive.timestamp
sales_date : DATE;
item_code : TEXT;
amount : INT;
selling_price : INT;
};
Run the following command to generate a data model class from DMDL.
gradle compileDMDL
The Input Definition Class (http://docs.asakusafw.com/latest/release/ja/html/directio/user-guide.html#dsl) is the sales.csv file on the input path.
src/main/java/com/example/jobflow/SalesFromCSV.java
package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractSalesCsvInputDescription;
public class SalesFromCSV extends AbstractSalesCsvInputDescription {
@Override
public String getBasePath() {
return "input";
}
@Override
public String getResourcePattern() {
return "sales.csv";
}
}
Output definition class is on the result / sales
path. Output with the file name sales.parquet. *
. If there is a wildcard (*) in the file name, the result of distributed parallel processing will be assigned a unique name while being divided and output in parallel, so speedup can be expected.
src/main/java/com/example/jobflow/SalesToParquet.java
package com.example.jobflow;
import java.util.Arrays;
import java.util.List;
import com.example.modelgen.dmdl.hive.parquet.AbstractSalesParquetFileOutputDescription;
public class SalesToParquet extends AbstractSalesParquetFileOutputDescription {
@Override
public String getBasePath() {
return "result/sales";
}
@Override
public String getResourcePattern() {
return "sales.parquet.*";
}
@Override
public List<String> getDeletePatterns() {
return Arrays.asList("*");
}
}
In this sample scenario, we don't implement operators, we just implement a job flow connected from input to output and a batch class that executes that job flow.
src/main/java/com/example/jobflow/SalesParquetJob.java
package com.example.jobflow;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.example.modelgen.dmdl.model.Sales;
@JobFlow(name = "salesParquetJob")
public class SalesParquetJob extends FlowDescription {
final In<Sales> salesIn;
final Out<Sales> salesOut;
public SalesParquetJob(
@Import(name = "salesIn", description = SalesFromCSV.class)
In<Sales> salesIn,
@Export(name = "salesOut", description = SalesToParquet.class)
Out<Sales> salesOut) {
this.salesIn = salesIn;
this.salesOut = salesOut;
}
@Override
protected void describe() {
salesOut.add(salesIn);
}
}
src/main/java/com/example/batch/SalesParquetBatch.java
package com.example.batch;
import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SalesParquetJob;
@Batch(name = "example.salesParquet")
public class SalesParquetBatch extends BatchDescription {
@Override
protected void describe() {
run(SalesParquetJob.class).soon();
}
}
Copy the deployment archive file created by the following command to the Hadoop cluster (on the $ HOME
path of the ʻasakusa` user).
gradle assemble
./build/asakusafw-asakusa-example-parquet.tar.gz
Set the environment variables ʻASAKUSA_HOME and
SPARK_CMDin
.bash_profileetc. as shown in the example below. However, the
SPARK_CMD environment variable does not need to be set if the
spark-submit` command is on the path.
.bash_profile
export ASAKUSA_HOME=${HOME}/asakusa
export SPARK_CMD=/opt/mapr/spark/spark-2.1.0/bin/spark-submit
ʻExtract the deployment archive file on the path of the ASAKUSA_HOME` environment variable and run the setup.jar command.
$ rm -r $ASAKUSA_HOME
$ mkdir $ASAKUSA_HOME
$ cd $ASAKUSA_HOME
$ tar xvf ~/asakusafw-asakusa-example-parquet.tar.gz
$ java -jar $ASAKUSA_HOME/tools/bin/setup.jar
Deploy the following randomly generated CSV file on HDFS.
sales.csv
2008-05-04,ilCQBVYBWSVOO,46,224
2001-02-28,nTMbJJqLzwYqw,4,208
2003-05-09,MASAMJmjlETfL,18,246
1996-04-18,RCXfHnTwpcqFS,50,249
2004-01-15,RqppmAoSuhamm,1,360
1994-01-02,kjVImLuoLaeQb,9,600
2013-08-22,JPQkeJNzMQtjI,5,250
1991-05-12,aLzNHOcSqcrys,22,785
1981-08-01,sGOCOObwYSbFr,21,813
2010-03-02,PZvFqxThHEnsX,21,198
$ hadoop fs -mkdir -p directio/input
$ hadoop fs -put sales.csv directio/input/
Run the application on Spark with the batch ID as an argument in YAESS. The result file is output as a parquet format file.
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.example.salesParquet
$ hadoop fs -ls directio/result/sales/
Found 1 items
-rwxr-xr-x 3 asakusa asakusa 25733 2018-02-15 09:01 directio/result/sales/sales.parquet.s0-p0
[a
Generate the Impala DDL file from Asakusa CLI generate command. In the following, DDL is generated by specifying the addition of LOCATION and the setting to be registered as an external table.
$ $ASAKUSA_HOME/bin/asakusa generate ddl hive --external --location /=hdfs:///user/asakusa/directio -o sales.sql spark.example.salesParquet
sales.sql
CREATE EXTERNAL TABLE sales (
sales_date TIMESTAMP ,
item_code STRING ,
amount INT ,
selling_price INT
)
STORED AS PARQUET
LOCATION 'hdfs:///user/asakusa/directio/result/sales';
Register the generated DDL file (sales.sql
).
$ impala-shell -i localhost -f sales.sql
Execute a SQL query from the ʻimpala-shell` command to check the operation.
$ impala-shell -i localhost
[localhost:21000] > select * from sales;
+---------------------+---------------+--------+---------------+
| sales_date | item_code | amount | selling_price |
+---------------------+---------------+--------+---------------+
| 2008-05-04 00:00:00 | ilCQBVYBWSVOO | 46 | 224 |
| 2001-02-28 00:00:00 | nTMbJJqLzwYqw | 4 | 208 |
| 2003-05-09 00:00:00 | MASAMJmjlETfL | 18 | 246 |
| 1996-04-18 00:00:00 | RCXfHnTwpcqFS | 50 | 249 |
| 2004-01-15 00:00:00 | RqppmAoSuhamm | 1 | 360 |
| 1994-01-02 00:00:00 | kjVImLuoLaeQb | 9 | 600 |
| 2013-08-22 00:00:00 | JPQkeJNzMQtjI | 5 | 250 |
| 1991-05-12 00:00:00 | aLzNHOcSqcrys | 22 | 785 |
| 1981-08-01 00:00:00 | sGOCOObwYSbFr | 21 | 813 |
| 2010-03-02 00:00:00 | PZvFqxThHEnsX | 21 | 198 |
+---------------------+---------------+--------+---------------+
Fetched 10 row(s) in 0.11s
You can refer to it from BI tools (Tableau, for example) by connecting with ʻImpala ODBC Driver` provided by Cloudera.
This time, I introduced the linkage function of Asakusa Framework using Impala as an example. Any product such as Hive that supports Parquet format or ORC format can be linked, so I hope it will be helpful.