Ich wollte Spark vom Bildschirm des in Java erstellten WEB aus starten, daher möchte ich dieses Mal Spark mit dem AWS SDK für Java einreichen. Das ist es! Ich konnte den Artikel nicht finden, also werde ich ihn auf Qiita belassen.
Diesmal habe ich einen Job gemacht, der nur CSV-Daten in Parkett konvertiert. Starten Sie den Cluster zunächst mit EMR und führen Sie ihn über die Verwaltungskonsole aus. Als nächstes möchte ich Folgendes automatisieren.
Die Quelle ist hier. https://github.com/uzresk/spark-samples.git
Empfängt die CSV-Bucket-URL und die Parkett-Ausgabeziel-URL als Argumente. Das Schema wird automatisch konvertiert, der Datenrahmen wird mit dem Header "Ja" gelesen und in Parkett konvertiert.
Java
Erstellen Sie eine JAR-Datei mit dem MVN-Paket.
Csv2Parquet.java
package jp.gr.java_conf.uzresk.samples.spark.jobs;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class Csv2Parquet {
public static void main(String[] args) {
String input = args[0];
String output = args[1];
SparkSession spark = SparkSession.builder().appName("Csv2ParquetJavaApplication").getOrCreate();
Dataset<Row> df = spark.read().option("header", "true").option("inferSchema", "true").csv(input);
df.write().mode(SaveMode.Overwrite).parquet(output);
}
}
Scala
Als Bonus habe ich es in Scala geschrieben. Bitte mit sbt verpacken.
Csv2Parquet.scala
package jp.gr.java_conf.uzresk.samples.spark.jobs
import org.apache.spark.sql.{SaveMode, SparkSession}
object Csv2Parquet {
def main(args: Array[String]) {
val input = args(0)
val output = args(1)
val spark = SparkSession.builder.appName("Csv2ParquetJavaAppliation").getOrCreate
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(input)
df.write.mode(SaveMode.Overwrite).parquet(output)
}
}
Worüber sollten Sie sich beim Starten eines Clusters Gedanken machen?
Diesmal habe ich es mit ca. 2GB Daten versucht. Die Konvertierung wird in 2 Minuten mit 2 Kernknoten abgeschlossen.
Fügen Sie nach dem Drücken der Bootcluster-Schaltfläche weitere Schritte hinzu.
Wenn dies fehlschlägt, können Sie die Standardfehlerausgabe überprüfen, indem Sie auf den Pfeil klicken. Bitte beachten Sie, dass hier nichts angezeigt wird, wenn Sie das Protokoll nicht festgelegt haben.
Definieren Sie JOB in StepConfig, geben Sie die Cluster-ID an und fügen Sie STEP hinzu. Da es mit dem auf dem Bildschirm eingestellten Inhalt identisch ist, gibt es meines Erachtens nicht viel Zögern.
SparkSubmitSingleStepRunningCluster.java
package jp.gr.java_conf.uzresk.samples.spark.emr;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;
import java.util.ArrayList;
import java.util.List;
public class SparkSubmitSingleStepRunningCluster {
private static final String CLUSTER_ID = "j-1VOWW7554GD9Z";
public static void main(String[] args) {
new SparkSubmitSingleStepRunningCluster().startJob();
}
private void startJob() {
ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withClientConfiguration(cc)
.withRegion(Regions.AP_NORTHEAST_1)
.build();
List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(createCsv2ParquetConfig());
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(CLUSTER_ID, stepConfigs);
AddJobFlowStepsResult result = emr.addJobFlowSteps(req);
System.out.println(result.getStepIds());
}
private StepConfig createCsv2ParquetConfig() {
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--executor-memory", "1g",
"--deploy-mode", "cluster",
"--class", Resource.getString("job.class"),
Resource.getString("job.jar"),
Resource.getString("csv2parquet.csv-file"),
Resource.getString("csv2parquet.parquet-path"));
return new StepConfig()
.withName("Csv2Parquet")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(sparkStepConf);
}
}
Ich werde es so schreiben. Es gibt viele Parameter zum Starten des EMR-Clusters, aber ansonsten ist es dasselbe wie im vorherigen Beispiel.
SparkSubmitIncludingStartupCluster.java
package jp.gr.java_conf.uzresk.samples.spark.emr;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;
import java.util.ArrayList;
import java.util.List;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.CORE;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.MASTER;
public class SparkSubmitIncludingStartupCluster {
public static void main(String[] args) {
ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withClientConfiguration(cc)
.withRegion(Regions.AP_NORTHEAST_1)
.build();
new SparkSubmitIncludingStartupCluster().launch(emr);
}
private void launch(AmazonElasticMapReduce emr) {
RunJobFlowRequest request = createRequest();
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println(result.getJobFlowId() + " is starting");
}
private RunJobFlowRequest createRequest() {
return new RunJobFlowRequest()
.withName(Resource.getString("emr.cluster-name"))
.withReleaseLabel(Resource.getString("emr.release-label"))
.withSteps(createStepConfig())
.withApplications(createApplicationList())
.withTags(new Tag("Name", Resource.getString("emr.cluster-name")))
.withEbsRootVolumeSize(Resource.getInt("emr.ebs-root-volume-size"))
.withServiceRole(Resource.getString("emr.service-role"))
.withAutoScalingRole(Resource.getString("emr.auto-scaling-role"))
.withJobFlowRole(Resource.getString("emr.job-flow-role"))
.withLogUri(Resource.getString("emr.log-uri"))
.withVisibleToAllUsers(true)
.withInstances(createJobFlowInstancesConfig(createMasterConfig(), createCoreConfig()));
}
private List<StepConfig> createStepConfig() {
List<StepConfig> configs = new ArrayList<>();
configs.add(createDebugConfig());
configs.add(createCsv2ParquetConfig());
return configs;
}
private StepConfig createDebugConfig() {
String COMMAND_RUNNER = "command-runner.jar";
String DEBUGGING_COMMAND = "state-pusher-script";
String DEBUGGING_NAME = "Setup Hadoop Debugging";
return new StepConfig()
.withName(DEBUGGING_NAME)
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(
new HadoopJarStepConfig().withJar(COMMAND_RUNNER).withArgs(DEBUGGING_COMMAND));
}
private StepConfig createCsv2ParquetConfig() {
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--executor-memory", "1g",
"--deploy-mode", "cluster",
"--class", Resource.getString("job.class"),
Resource.getString("job.jar"),
Resource.getString("csv2parquet.csv-file"),
Resource.getString("csv2parquet.parquet-path"));
return new StepConfig()
.withName("Csv2Parquet")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(sparkStepConf);
}
private ArrayList<Application> createApplicationList() {
ArrayList<Application> applications = new ArrayList<>();
applications.add(new Application().withName("Spark"));
return applications;
}
private InstanceGroupConfig createMasterConfig() {
return new InstanceGroupConfig()
.withInstanceCount(Resource.getInt("emr.master.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(MASTER)
.withInstanceType(Resource.getString("emr.master.instance-type"))
.withName(Resource.getString("emr.master.name"));
}
private InstanceGroupConfig createCoreConfig() {
return new InstanceGroupConfig()
.withInstanceCount(Resource.getInt("emr.core.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(CORE)
.withInstanceType(Resource.getString("emr.core.instance-type"))
.withName(Resource.getString("emr.core.name"));
}
private JobFlowInstancesConfig createJobFlowInstancesConfig(
InstanceGroupConfig masterConfiguration, InstanceGroupConfig coreConfiguration) {
return new JobFlowInstancesConfig()
.withInstanceGroups(masterConfiguration, coreConfiguration)
.withEc2SubnetId(Resource.getString("emr.subnet-id"))
.withEmrManagedMasterSecurityGroup(Resource.getString("emr.master-sg"))
.withEmrManagedSlaveSecurityGroup(Resource.getString("emr.slave-sg"))
.withEc2KeyName(Resource.getString("emr.key-name"))
.withKeepJobFlowAliveWhenNoSteps(fales); //Stoppen Sie die Instanz am Ende des Jobs
}
}
Wenn Sie in der letzten Zeile withKeepJobFlowAliveWhenNoSteps auf false setzen, wird der Cluster nach Ausführung von JOB automatisch gestoppt.
Die Stapelverarbeitung kann mithilfe von Spot-Instanzen noch kostengünstiger ausgeführt werden. Aus Erfahrung wird es weniger als die Hälfte des Preises sein.
Ich verwende eine m4.xlarge-On-Demand-Instanz für 0,258 US-Dollar. Der Preis für Spot-Instanzen schien zwischen 0,06 und 0,10 zu liegen. Stellen Sie den Preis also auf 0,10 USD ein und starten Sie ihn. (Weil ich es diesmal sicher bewegen wollte)
Wenn Sie SDK verwenden, können Sie es verschieben, indem Sie jeweils 2 Zeilen für MASTER und CORE hinzufügen.
SparkSubmitIncludingStartupSpotCluster.java
private InstanceGroupConfig createMasterConfig() {
return new InstanceGroupConfig()
.withBidPrice("0.10") // $0.10
.withMarket(MarketType.SPOT) //Verwenden Sie die Spot-Instanz
.withInstanceCount(Resource.getInt("emr.master.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(MASTER)
.withInstanceType(Resource.getString("emr.master.instance-type"))
.withName(Resource.getString("emr.master.name"));
}
private InstanceGroupConfig createCoreConfig() {
return new InstanceGroupConfig()
.withBidPrice("0.10") // $0.10
.withMarket(MarketType.SPOT) //Verwenden Sie die Spot-Instanz
.withInstanceCount(Resource.getInt("emr.core.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(CORE)
.withInstanceType(Resource.getString("emr.core.instance-type"))
.withName(Resource.getString("emr.core.name"));
}
Sie sollten in der Lage sein, zu bestätigen, dass es auf der Spot-Instanz ausgeführt wird, indem Sie die Registerkarte "Zusammenfassung" von EMR überprüfen.
Recommended Posts