Im vorherigen Artikel Hallo Welt! Mit Asakusa Framework!, Asakusa Frameowk Ich habe erklärt, wie man implementiert (/index.html). In diesem Artikel möchte ich die Inhalte vorstellen, die näher am Geschäftsszenario liegen. In der folgenden Abbildung werden verschiedene Daten auf Hadoop (HDFS) gesammelt und mit Asakusa on Spark verarbeitet. Dies ist ein Beispiel für eine Datenanalyseplattform, die die generierten Daten mit BI-Tools visualisiert. Es wird angenommen, dass die Datenskala extrem große Daten wie z. B. mehrere zehn TB für mehrere PB verarbeitet. In diesem Artikel werde ich als Beispiel für eine SQL-Abfrage-Engine vorstellen, wie diese Elementartechnologien Asakusa auf Spark und Impala verknüpft werden.
Es wird davon ausgegangen, dass die Eingabedatei bereits auf Hadoops HDFS bereitgestellt wurde. Ursprünglich wird die Eingabedatei nach der Verarbeitung ausgegeben. In diesem Beispielszenario wird die Eingabedatei jedoch einfach in das Parkettformat auf HDFS konvertiert und ausgegeben. Definieren Sie die Ausgabedaten als externe Impala-Tabelle und beziehen Sie sich im BI-Tool darauf.
Das Verkaufsdatenmodell sieht folgendermaßen aus: Die Möglichkeit zur Konvertierung zwischen Asakusa-Eigenschaftsdatenformaten und Impala-Datenformaten wird in der DMDL-Definition beschrieben.
Datenelement | Asakusa-Datentyp | Impala-Datentyp |
---|---|---|
Verkaufsdatum (Verkauf_date) | DATE | TIMESTAMP |
Artikelcode (Artikel_code) | TEXT | STRING |
Verkaufsmenge (Menge) | INT | INT |
Verkaufsbetrag (Verkauf_price) | INT | INT |
Wir haben den Betrieb in der folgenden Umgebung bestätigt.
Bitte beachten Sie, dass Direct I / O Hive von Asakusa Framework 0.10 Hive 1.2.2 verwendet, sodass wir nicht die gesamte Datenkompatibilität zwischen Hive-Versionen überprüft haben.
Folgendes sollte als Vorarbeit abgeschlossen werden.
Erstellen Sie zunächst einen Projektordner. In diesem Beispiel werden wir an den folgenden Ordnern arbeiten.
mkdir asakusa-example-parquet
Erstellen Sie eine Gradle-Skriptdatei im Projektordner. Dieses Mal haben wir Asakusa im Spark Plugin so konfiguriert, dass Spark als Ausführungsmodul verwendet wird. Weitere Informationen zu dem von Ihnen erstellten Gradle-Skript finden Sie in der Referenz Asakusa Gradle Plugin.
build.gradle
group 'com.example'
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
}
dependencies {
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'
asakusafw {
sdk.hive true
}
asakusafwOrganizer {
hive.enabled true
profiles.prod {
assembly.into('.') {
put 'src/dist/prod'
}
}
}
Aus der Standard-Asakusa-Konfigurationsdatei der Dateisystempfad (com.asakusafw.directio] von [Direct I / O](http://docs.asakusafw.com/latest/release/ja/html/directio/start-guide.html) .root.fs.path
) wurde von target / testing / directio
in directio
geändert.
Mit dieser Einstellung lautet der direkte E / A-Stammpfad in diesem Beispielszenario "hdfs: /// user / asakusa / directio".
src/dist/prod/core/conf/asakusa-resources.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>com.asakusafw.runtime.core.Report.Delegate</name>
<value>com.asakusafw.runtime.report.CommonsLoggingReport</value>
</property>
<property>
<name>com.asakusafw.directio.root</name>
<value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
<name>com.asakusafw.directio.root.path</name>
<value>/</value>
</property>
<property>
<name>com.asakusafw.directio.root.fs.path</name>
<value>directio</value>
</property>
</configuration>
DMDL DMDL (Data Model Definition Language) Erstellen Sie eine Skriptdatei, um das Datenmodell zu definieren. Dieses Modell verwendet das CSV-Format für die Eingabe und das Parkettformat für die Ausgabe. Verwenden Sie bei der Ausgabe im Parkettformat die Asakusa Framework-Funktion Direct I / O Hive. .. Neben dem Parkettformat wird auch das ORC-Format unterstützt. Die folgende sales_date-Eigenschaft ist dasselbe DATE wie der Asakusa-Datentyp in der Hive-Standardzuordnung. Da Impala jedoch keinen Typ hat, der dem Hive-DATE-Typ entspricht, kann Direct I / O Hive [Konvertierungsfunktion für Zuordnungstyp](http: //docs.asakusafw.com/latest/release/ja/html/directio/using-hive.html#id19) wird zum Konvertieren in den TIMESTAMP-Typ verwendet.
src/main/dmdl/sales.dmdl
@directio.csv
@directio.hive.parquet(
table_name = "sales"
)
sales = {
@directio.hive.timestamp
sales_date : DATE;
item_code : TEXT;
amount : INT;
selling_price : INT;
};
Führen Sie den folgenden Befehl aus, um eine Datenmodellklasse aus DMDL zu generieren.
gradle compileDMDL
Die Eingabedefinitionsklasse (http://docs.asakusafw.com/latest/release/ja/html/directio/user-guide.html#dsl) ist die Datei sales.csv im Eingabepfad.
src/main/java/com/example/jobflow/SalesFromCSV.java
package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractSalesCsvInputDescription;
public class SalesFromCSV extends AbstractSalesCsvInputDescription {
@Override
public String getBasePath() {
return "input";
}
@Override
public String getResourcePattern() {
return "sales.csv";
}
}
Ausgabedefinitionsklasse befindet sich auf dem Pfad "Ergebnis / Verkauf". Ausgabe mit dem Dateinamen "sales.parquet. ". Wenn der Dateiname einen Platzhalter () enthält, wird dem Ergebnis der verteilten Parallelverarbeitung ein eindeutiger Name zugewiesen, während es geteilt und parallel ausgegeben wird, sodass eine Beschleunigung zu erwarten ist.
src/main/java/com/example/jobflow/SalesToParquet.java
package com.example.jobflow;
import java.util.Arrays;
import java.util.List;
import com.example.modelgen.dmdl.hive.parquet.AbstractSalesParquetFileOutputDescription;
public class SalesToParquet extends AbstractSalesParquetFileOutputDescription {
@Override
public String getBasePath() {
return "result/sales";
}
@Override
public String getResourcePattern() {
return "sales.parquet.*";
}
@Override
public List<String> getDeletePatterns() {
return Arrays.asList("*");
}
}
In diesem Beispielszenario implementieren wir den Operator nicht, daher implementieren wir nur den Jobfluss, der von der Eingabe zur Ausgabe verbunden ist, und die Stapelklasse, die den Jobfluss ausführt.
src/main/java/com/example/jobflow/SalesParquetJob.java
package com.example.jobflow;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.example.modelgen.dmdl.model.Sales;
@JobFlow(name = "salesParquetJob")
public class SalesParquetJob extends FlowDescription {
final In<Sales> salesIn;
final Out<Sales> salesOut;
public SalesParquetJob(
@Import(name = "salesIn", description = SalesFromCSV.class)
In<Sales> salesIn,
@Export(name = "salesOut", description = SalesToParquet.class)
Out<Sales> salesOut) {
this.salesIn = salesIn;
this.salesOut = salesOut;
}
@Override
protected void describe() {
salesOut.add(salesIn);
}
}
src/main/java/com/example/batch/SalesParquetBatch.java
package com.example.batch;
import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SalesParquetJob;
@Batch(name = "example.salesParquet")
public class SalesParquetBatch extends BatchDescription {
@Override
protected void describe() {
run(SalesParquetJob.class).soon();
}
}
Kopieren Sie die mit dem folgenden Befehl erstellte Bereitstellungsarchivdatei in den Hadoop-Cluster (im Pfad "$ HOME" des Benutzers "asakusa").
gradle assemble
./build/asakusafw-asakusa-example-parquet.tar.gz
Stellen Sie die Umgebungsvariablen "ASAKUSA_HOME" und "SPARK_CMD" in ".bash_profile" usw. ein, wie im folgenden Beispiel gezeigt. Befindet sich der Befehl "spark-submit" jedoch im Pfad, muss die Umgebungsvariable "SPARK_CMD" nicht festgelegt werden.
.bash_profile
export ASAKUSA_HOME=${HOME}/asakusa
export SPARK_CMD=/opt/mapr/spark/spark-2.1.0/bin/spark-submit
Erweitern Sie die Bereitstellungsarchivdatei im Pfad der Umgebungsvariablen "ASAKUSA_HOME" und führen Sie den Befehl setup.jar aus.
$ rm -r $ASAKUSA_HOME
$ mkdir $ASAKUSA_HOME
$ cd $ASAKUSA_HOME
$ tar xvf ~/asakusafw-asakusa-example-parquet.tar.gz
$ java -jar $ASAKUSA_HOME/tools/bin/setup.jar
Stellen Sie die folgende zufällig generierte CSV-Datei auf HDFS bereit.
sales.csv
2008-05-04,ilCQBVYBWSVOO,46,224
2001-02-28,nTMbJJqLzwYqw,4,208
2003-05-09,MASAMJmjlETfL,18,246
1996-04-18,RCXfHnTwpcqFS,50,249
2004-01-15,RqppmAoSuhamm,1,360
1994-01-02,kjVImLuoLaeQb,9,600
2013-08-22,JPQkeJNzMQtjI,5,250
1991-05-12,aLzNHOcSqcrys,22,785
1981-08-01,sGOCOObwYSbFr,21,813
2010-03-02,PZvFqxThHEnsX,21,198
$ hadoop fs -mkdir -p directio/input
$ hadoop fs -put sales.csv directio/input/
Führen Sie die Anwendung unter Spark mit der Stapel-ID als Argument in [YAESS] aus (http://docs.asakusafw.com/latest/release/ja/html/yaess/index.html). Die Ergebnisdatei wird als Parkettdatei ausgegeben.
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.example.salesParquet
$ hadoop fs -ls directio/result/sales/
Found 1 items
-rwxr-xr-x 3 asakusa asakusa 25733 2018-02-15 09:01 directio/result/sales/sales.parquet.s0-p0
[a
Generieren Sie eine Impala-DDL-Datei aus Asakusa CLI-Generierungsbefehl. Im Folgenden wird DDL generiert, indem das Hinzufügen von LOCATION und die Einstellung angegeben werden, die als externe Tabelle registriert werden soll.
$ $ASAKUSA_HOME/bin/asakusa generate ddl hive --external --location /=hdfs:///user/asakusa/directio -o sales.sql spark.example.salesParquet
sales.sql
CREATE EXTERNAL TABLE sales (
sales_date TIMESTAMP ,
item_code STRING ,
amount INT ,
selling_price INT
)
STORED AS PARQUET
LOCATION 'hdfs:///user/asakusa/directio/result/sales';
Registrieren Sie die generierte DDL-Datei (sales.sql
).
$ impala-shell -i localhost -f sales.sql
Führen Sie eine SQL-Abfrage mit dem Befehl impala-shell
aus, um den Vorgang zu überprüfen.
$ impala-shell -i localhost
[localhost:21000] > select * from sales;
+---------------------+---------------+--------+---------------+
| sales_date | item_code | amount | selling_price |
+---------------------+---------------+--------+---------------+
| 2008-05-04 00:00:00 | ilCQBVYBWSVOO | 46 | 224 |
| 2001-02-28 00:00:00 | nTMbJJqLzwYqw | 4 | 208 |
| 2003-05-09 00:00:00 | MASAMJmjlETfL | 18 | 246 |
| 1996-04-18 00:00:00 | RCXfHnTwpcqFS | 50 | 249 |
| 2004-01-15 00:00:00 | RqppmAoSuhamm | 1 | 360 |
| 1994-01-02 00:00:00 | kjVImLuoLaeQb | 9 | 600 |
| 2013-08-22 00:00:00 | JPQkeJNzMQtjI | 5 | 250 |
| 1991-05-12 00:00:00 | aLzNHOcSqcrys | 22 | 785 |
| 1981-08-01 00:00:00 | sGOCOObwYSbFr | 21 | 813 |
| 2010-03-02 00:00:00 | PZvFqxThHEnsX | 21 | 198 |
+---------------------+---------------+--------+---------------+
Fetched 10 row(s) in 0.11s
Sie können in BI-Tools (z. B. Tableau) darauf zugreifen, indem Sie eine Verbindung mit dem von Cloudera bereitgestellten Impala ODBC-Treiber herstellen.
Dieses Mal habe ich die Verknüpfungsfunktion von Asakusa Framework am Beispiel von Impala eingeführt. Jedes Produkt wie Hive, das das Parkettformat oder das ORC-Format unterstützt, kann verlinkt werden. Ich hoffe, es ist hilfreich.