Dans l'article précédent Hello, World! With Asakusa Framework!, Asakusa Frameowk J'ai expliqué comment implémenter (/index.html). Dans cet article, je voudrais présenter les contenus qui sont plus proches du scénario commercial. Dans la figure ci-dessous, diverses données sont accumulées sur Hadoop (HDFS) et traitées avec Asakusa on Spark. Ceci est un exemple de plateforme d'analyse de données qui visualise les données générées avec des outils de BI. L'échelle de données est supposée traiter des données extrêmement volumineuses telles que des dizaines de TB à plusieurs PB. Dans cet article, je vais vous présenter comment lier ces technologies élémentaires Asakusa sur Spark et Impala comme exemple de moteur de requête SQL.
Il est supposé que le fichier d'entrée a déjà été déployé sur le HDFS de Hadoop. À l'origine, le fichier d'entrée est sorti après le traitement, mais dans cet exemple de scénario, le fichier d'entrée est simplement converti au format Parquet sur HDFS et en sortie. Définissez les données de sortie comme une table externe Impala et faites-y référence à partir de l'outil BI.
Le modèle de données de vente ressemble à ceci: La possibilité de convertir entre les formats de données de propriété Asakusa et les formats de données Impala est décrite dans la définition DMDL.
élément de données | Type de données Asakusa | Type de données Impala |
---|---|---|
Date de vente (ventes_date) | DATE | TIMESTAMP |
Code article (article_code) | TEXT | STRING |
Quantité de vente (montant) | INT | INT |
Montant des ventes (vente_price) | INT | INT |
Nous avons confirmé l'opération dans l'environnement suivant.
Veuillez noter que Direct I / O Hive d'Asakusa Framework 0.10 utilise Hive 1.2.2, nous n'avons donc pas vérifié la compatibilité de toutes les données entre les versions de Hive.
Ce qui suit devrait être complété comme un travail préliminaire.
Tout d'abord, créez un dossier de projet. Dans cet exemple, nous travaillerons sur les dossiers suivants.
mkdir asakusa-example-parquet
Créez un fichier de script Gradle dans le dossier du projet. Cette fois, nous avons configuré Asakusa sur Spark Plugin pour utiliser Spark comme moteur d'exécution. Pour plus d'informations sur le script Gradle que vous avez créé, consultez la référence Asakusa Gradle Plugin.
build.gradle
group 'com.example'
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
}
dependencies {
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'
asakusafw {
sdk.hive true
}
asakusafwOrganizer {
hive.enabled true
profiles.prod {
assembly.into('.') {
put 'src/dist/prod'
}
}
}
À partir du fichier de configuration Asakusa par défaut, le chemin du système de fichiers (com.asakusafw.directio] de [Direct I / O](http://docs.asakusafw.com/latest/release/ja/html/directio/start-guide.html) .root.fs.path
) a été changé de target / testing / directio
à directio
.
Avec ce paramètre, le chemin d'accès racine d'E / S directes sera hdfs: /// user / asakusa / directio
dans cet exemple de scénario.
src/dist/prod/core/conf/asakusa-resources.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>com.asakusafw.runtime.core.Report.Delegate</name>
<value>com.asakusafw.runtime.report.CommonsLoggingReport</value>
</property>
<property>
<name>com.asakusafw.directio.root</name>
<value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
<name>com.asakusafw.directio.root.path</name>
<value>/</value>
</property>
<property>
<name>com.asakusafw.directio.root.fs.path</name>
<value>directio</value>
</property>
</configuration>
DMDL DMDL (Data Model Definition Language) Créez un fichier de script pour définir le modèle de données. Ce modèle suppose le format CSV pour l'entrée et le format Parquet pour la sortie. Lors de la sortie au format Parquet, utilisez la fonction Asakusa Framework appelée Direct I / O Hive. .. En plus du format Parquet, il prend également en charge le format ORC. La propriété sales_date suivante est la même DATE que le type de données Asakusa dans le mappage standard Hive, mais comme Impala n'a pas de type équivalent au type Hive DATE, Direct I / O Hive's [Mapping type conversion function](http: //docs.asakusafw.com/latest/release/ja/html/directio/using-hive.html#id19) est utilisé pour convertir en type TIMESTAMP.
src/main/dmdl/sales.dmdl
@directio.csv
@directio.hive.parquet(
table_name = "sales"
)
sales = {
@directio.hive.timestamp
sales_date : DATE;
item_code : TEXT;
amount : INT;
selling_price : INT;
};
Exécutez la commande suivante pour générer une classe de modèle de données à partir de DMDL.
gradle compileDMDL
La classe de définition d'entrée (http://docs.asakusafw.com/latest/release/ja/html/directio/user-guide.html#dsl) est le fichier sales.csv sur le chemin d'entrée.
src/main/java/com/example/jobflow/SalesFromCSV.java
package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractSalesCsvInputDescription;
public class SalesFromCSV extends AbstractSalesCsvInputDescription {
@Override
public String getBasePath() {
return "input";
}
@Override
public String getResourcePattern() {
return "sales.csv";
}
}
Classe de définition de sortie se trouve sur le chemin result / sales
. Sortie avec le nom de fichier sales.parquet. *
. S'il y a un caractère générique (*) dans le nom de fichier, le résultat du traitement parallèle distribué se verra attribuer un nom unique tout en étant divisé et sorti en parallèle, donc une accélération peut être attendue.
src/main/java/com/example/jobflow/SalesToParquet.java
package com.example.jobflow;
import java.util.Arrays;
import java.util.List;
import com.example.modelgen.dmdl.hive.parquet.AbstractSalesParquetFileOutputDescription;
public class SalesToParquet extends AbstractSalesParquetFileOutputDescription {
@Override
public String getBasePath() {
return "result/sales";
}
@Override
public String getResourcePattern() {
return "sales.parquet.*";
}
@Override
public List<String> getDeletePatterns() {
return Arrays.asList("*");
}
}
Dans cet exemple de scénario, nous n'implémentons pas l'opérateur, nous implémentons donc uniquement le flux de travaux connecté de l'entrée à la sortie et la classe de lots qui exécute le flux de travaux.
src/main/java/com/example/jobflow/SalesParquetJob.java
package com.example.jobflow;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.example.modelgen.dmdl.model.Sales;
@JobFlow(name = "salesParquetJob")
public class SalesParquetJob extends FlowDescription {
final In<Sales> salesIn;
final Out<Sales> salesOut;
public SalesParquetJob(
@Import(name = "salesIn", description = SalesFromCSV.class)
In<Sales> salesIn,
@Export(name = "salesOut", description = SalesToParquet.class)
Out<Sales> salesOut) {
this.salesIn = salesIn;
this.salesOut = salesOut;
}
@Override
protected void describe() {
salesOut.add(salesIn);
}
}
src/main/java/com/example/batch/SalesParquetBatch.java
package com.example.batch;
import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SalesParquetJob;
@Batch(name = "example.salesParquet")
public class SalesParquetBatch extends BatchDescription {
@Override
protected void describe() {
run(SalesParquetJob.class).soon();
}
}
Copiez le fichier d'archive de déploiement créé par la commande suivante dans le cluster Hadoop (sur le chemin $ HOME
de l'utilisateur ʻasakusa`).
gradle assemble
./build/asakusafw-asakusa-example-parquet.tar.gz
Définissez les variables d'environnement ʻASAKUSA_HOME et
SPARK_CMDdans
.bash_profileetc. comme indiqué dans l'exemple ci-dessous. Cependant, si la commande
spark-submit est sur le chemin, la variable d'environnement
SPARK_CMD` n'a pas besoin d'être définie.
.bash_profile
export ASAKUSA_HOME=${HOME}/asakusa
export SPARK_CMD=/opt/mapr/spark/spark-2.1.0/bin/spark-submit
«Extrayez le fichier d'archive de déploiement sur le chemin de la variable d'environnement ASAKUSA_HOME» et exécutez la commande setup.jar.
$ rm -r $ASAKUSA_HOME
$ mkdir $ASAKUSA_HOME
$ cd $ASAKUSA_HOME
$ tar xvf ~/asakusafw-asakusa-example-parquet.tar.gz
$ java -jar $ASAKUSA_HOME/tools/bin/setup.jar
Déployez le fichier CSV généré aléatoirement suivant sur HDFS.
sales.csv
2008-05-04,ilCQBVYBWSVOO,46,224
2001-02-28,nTMbJJqLzwYqw,4,208
2003-05-09,MASAMJmjlETfL,18,246
1996-04-18,RCXfHnTwpcqFS,50,249
2004-01-15,RqppmAoSuhamm,1,360
1994-01-02,kjVImLuoLaeQb,9,600
2013-08-22,JPQkeJNzMQtjI,5,250
1991-05-12,aLzNHOcSqcrys,22,785
1981-08-01,sGOCOObwYSbFr,21,813
2010-03-02,PZvFqxThHEnsX,21,198
$ hadoop fs -mkdir -p directio/input
$ hadoop fs -put sales.csv directio/input/
Exécutez l'application sur Spark avec l'ID de lot comme argument dans YAESS. Le fichier résultat est généré sous forme de fichier au format parquet.
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.example.salesParquet
$ hadoop fs -ls directio/result/sales/
Found 1 items
-rwxr-xr-x 3 asakusa asakusa 25733 2018-02-15 09:01 directio/result/sales/sales.parquet.s0-p0
[a
Générez un fichier DDL Impala à partir de la commande Asakusa CLI generate. Dans ce qui suit, DDL est généré en spécifiant l'ajout de LOCATION et le paramètre à enregistrer en tant que table externe.
$ $ASAKUSA_HOME/bin/asakusa generate ddl hive --external --location /=hdfs:///user/asakusa/directio -o sales.sql spark.example.salesParquet
sales.sql
CREATE EXTERNAL TABLE sales (
sales_date TIMESTAMP ,
item_code STRING ,
amount INT ,
selling_price INT
)
STORED AS PARQUET
LOCATION 'hdfs:///user/asakusa/directio/result/sales';
Enregistrez le fichier DDL généré (sales.sql
).
$ impala-shell -i localhost -f sales.sql
Exécutez une requête SQL à partir de la commande ʻimpala-shell` pour vérifier l'opération.
$ impala-shell -i localhost
[localhost:21000] > select * from sales;
+---------------------+---------------+--------+---------------+
| sales_date | item_code | amount | selling_price |
+---------------------+---------------+--------+---------------+
| 2008-05-04 00:00:00 | ilCQBVYBWSVOO | 46 | 224 |
| 2001-02-28 00:00:00 | nTMbJJqLzwYqw | 4 | 208 |
| 2003-05-09 00:00:00 | MASAMJmjlETfL | 18 | 246 |
| 1996-04-18 00:00:00 | RCXfHnTwpcqFS | 50 | 249 |
| 2004-01-15 00:00:00 | RqppmAoSuhamm | 1 | 360 |
| 1994-01-02 00:00:00 | kjVImLuoLaeQb | 9 | 600 |
| 2013-08-22 00:00:00 | JPQkeJNzMQtjI | 5 | 250 |
| 1991-05-12 00:00:00 | aLzNHOcSqcrys | 22 | 785 |
| 1981-08-01 00:00:00 | sGOCOObwYSbFr | 21 | 813 |
| 2010-03-02 00:00:00 | PZvFqxThHEnsX | 21 | 198 |
+---------------------+---------------+--------+---------------+
Fetched 10 row(s) in 0.11s
Vous pouvez vous y référer à partir des outils BI (Tableau, par exemple) en vous connectant avec ʻImpala ODBC Driver` fourni par Cloudera.
Cette fois, j'ai présenté la fonction de liaison d'Asakusa Framework en utilisant Impala comme exemple. Tout produit tel que Hive qui prend en charge le format Parquet ou le format ORC peut être lié, j'espère donc que cela vous sera utile.