I wanted to start Spark from the WEB screen made in Java, so this time I would like to submit Spark using the AWS SDK for Java. This is it! I couldn't find the article, so I think I'll leave it on Qiita.
This time I made a job that just converts CSV data to Parquet. First, start the cluster with EMR and run it from the management console. Next, I would like to automate the following.
The source is here. https://github.com/uzresk/spark-samples.git
It receives the CSV bucket URL and Parquet output destination URL as arguments. Schema is automatically converted, Dataframe is read with header as Yes, and converted to Parquet.
Java
Create a jar file with mvn package.
Csv2Parquet.java
package jp.gr.java_conf.uzresk.samples.spark.jobs;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class Csv2Parquet {
public static void main(String[] args) {
String input = args[0];
String output = args[1];
SparkSession spark = SparkSession.builder().appName("Csv2ParquetJavaApplication").getOrCreate();
Dataset<Row> df = spark.read().option("header", "true").option("inferSchema", "true").csv(input);
df.write().mode(SaveMode.Overwrite).parquet(output);
}
}
Scala
As a bonus, I wrote it in Scala. Please package with sbt.
Csv2Parquet.scala
package jp.gr.java_conf.uzresk.samples.spark.jobs
import org.apache.spark.sql.{SaveMode, SparkSession}
object Csv2Parquet {
def main(args: Array[String]) {
val input = args(0)
val output = args(1)
val spark = SparkSession.builder.appName("Csv2ParquetJavaAppliation").getOrCreate
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(input)
df.write.mode(SaveMode.Overwrite).parquet(output)
}
}
What are some points to worry about when starting a cluster?
This time I tried with about 2GB of data. The conversion will be completed in 2 minutes with 2 core nodes.
After pressing the boot button for the cluster, we will add more steps.
If it fails, you can check the standard error output by clicking the arrow. Please note that nothing will appear here if you have not set the log.
Define JOB in StepConfig, specify the cluster ID, and add STEP. Since it is the same as the content set on the screen, I think that there is not much hesitation.
SparkSubmitSingleStepRunningCluster.java
package jp.gr.java_conf.uzresk.samples.spark.emr;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;
import java.util.ArrayList;
import java.util.List;
public class SparkSubmitSingleStepRunningCluster {
private static final String CLUSTER_ID = "j-1VOWW7554GD9Z";
public static void main(String[] args) {
new SparkSubmitSingleStepRunningCluster().startJob();
}
private void startJob() {
ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withClientConfiguration(cc)
.withRegion(Regions.AP_NORTHEAST_1)
.build();
List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(createCsv2ParquetConfig());
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(CLUSTER_ID, stepConfigs);
AddJobFlowStepsResult result = emr.addJobFlowSteps(req);
System.out.println(result.getStepIds());
}
private StepConfig createCsv2ParquetConfig() {
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--executor-memory", "1g",
"--deploy-mode", "cluster",
"--class", Resource.getString("job.class"),
Resource.getString("job.jar"),
Resource.getString("csv2parquet.csv-file"),
Resource.getString("csv2parquet.parquet-path"));
return new StepConfig()
.withName("Csv2Parquet")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(sparkStepConf);
}
}
I will write it like this. There are many parameters for starting a cluster of EMR, but other than that, it is the same as the previous example.
SparkSubmitIncludingStartupCluster.java
package jp.gr.java_conf.uzresk.samples.spark.emr;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;
import java.util.ArrayList;
import java.util.List;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.CORE;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.MASTER;
public class SparkSubmitIncludingStartupCluster {
public static void main(String[] args) {
ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withClientConfiguration(cc)
.withRegion(Regions.AP_NORTHEAST_1)
.build();
new SparkSubmitIncludingStartupCluster().launch(emr);
}
private void launch(AmazonElasticMapReduce emr) {
RunJobFlowRequest request = createRequest();
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println(result.getJobFlowId() + " is starting");
}
private RunJobFlowRequest createRequest() {
return new RunJobFlowRequest()
.withName(Resource.getString("emr.cluster-name"))
.withReleaseLabel(Resource.getString("emr.release-label"))
.withSteps(createStepConfig())
.withApplications(createApplicationList())
.withTags(new Tag("Name", Resource.getString("emr.cluster-name")))
.withEbsRootVolumeSize(Resource.getInt("emr.ebs-root-volume-size"))
.withServiceRole(Resource.getString("emr.service-role"))
.withAutoScalingRole(Resource.getString("emr.auto-scaling-role"))
.withJobFlowRole(Resource.getString("emr.job-flow-role"))
.withLogUri(Resource.getString("emr.log-uri"))
.withVisibleToAllUsers(true)
.withInstances(createJobFlowInstancesConfig(createMasterConfig(), createCoreConfig()));
}
private List<StepConfig> createStepConfig() {
List<StepConfig> configs = new ArrayList<>();
configs.add(createDebugConfig());
configs.add(createCsv2ParquetConfig());
return configs;
}
private StepConfig createDebugConfig() {
String COMMAND_RUNNER = "command-runner.jar";
String DEBUGGING_COMMAND = "state-pusher-script";
String DEBUGGING_NAME = "Setup Hadoop Debugging";
return new StepConfig()
.withName(DEBUGGING_NAME)
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(
new HadoopJarStepConfig().withJar(COMMAND_RUNNER).withArgs(DEBUGGING_COMMAND));
}
private StepConfig createCsv2ParquetConfig() {
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--executor-memory", "1g",
"--deploy-mode", "cluster",
"--class", Resource.getString("job.class"),
Resource.getString("job.jar"),
Resource.getString("csv2parquet.csv-file"),
Resource.getString("csv2parquet.parquet-path"));
return new StepConfig()
.withName("Csv2Parquet")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(sparkStepConf);
}
private ArrayList<Application> createApplicationList() {
ArrayList<Application> applications = new ArrayList<>();
applications.add(new Application().withName("Spark"));
return applications;
}
private InstanceGroupConfig createMasterConfig() {
return new InstanceGroupConfig()
.withInstanceCount(Resource.getInt("emr.master.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(MASTER)
.withInstanceType(Resource.getString("emr.master.instance-type"))
.withName(Resource.getString("emr.master.name"));
}
private InstanceGroupConfig createCoreConfig() {
return new InstanceGroupConfig()
.withInstanceCount(Resource.getInt("emr.core.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(CORE)
.withInstanceType(Resource.getString("emr.core.instance-type"))
.withName(Resource.getString("emr.core.name"));
}
private JobFlowInstancesConfig createJobFlowInstancesConfig(
InstanceGroupConfig masterConfiguration, InstanceGroupConfig coreConfiguration) {
return new JobFlowInstancesConfig()
.withInstanceGroups(masterConfiguration, coreConfiguration)
.withEc2SubnetId(Resource.getString("emr.subnet-id"))
.withEmrManagedMasterSecurityGroup(Resource.getString("emr.master-sg"))
.withEmrManagedSlaveSecurityGroup(Resource.getString("emr.slave-sg"))
.withEc2KeyName(Resource.getString("emr.key-name"))
.withKeepJobFlowAliveWhenNoSteps(fales); //Stop the instance at the end of the JOB
}
}
If you set withKeepJobFlowAliveWhenNoSteps in the last line to false, the cluster will stop automatically after executing JOB.
Batch processing can be executed even cheaper by using Spot Instances. From experience, it will be less than half the price.
I'm using an m4.xlarge on-demand instance for $ 0.258. The price of Spot Instances seemed to be around 0.06 to 0.10, so set the price around $ 0.10. and start it. (Because I wanted to move it surely this time)
When using the SDK, you can move it by adding two lines each for MASTER and CORE.
SparkSubmitIncludingStartupSpotCluster.java
private InstanceGroupConfig createMasterConfig() {
return new InstanceGroupConfig()
.withBidPrice("0.10") // $0.10
.withMarket(MarketType.SPOT) //Use Spot instance
.withInstanceCount(Resource.getInt("emr.master.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(MASTER)
.withInstanceType(Resource.getString("emr.master.instance-type"))
.withName(Resource.getString("emr.master.name"));
}
private InstanceGroupConfig createCoreConfig() {
return new InstanceGroupConfig()
.withBidPrice("0.10") // $0.10
.withMarket(MarketType.SPOT) //Use Spot instance
.withInstanceCount(Resource.getInt("emr.core.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(CORE)
.withInstanceType(Resource.getString("emr.core.instance-type"))
.withName(Resource.getString("emr.core.name"));
}
You should be able to confirm that it is running on the Spot Instance by checking the "Summary" tab of EMR.
Recommended Posts