Try Spark Submit to EMR using AWS SDK for Java

Introduction

I wanted to start Spark from the WEB screen made in Java, so this time I would like to submit Spark using the AWS SDK for Java. This is it! I couldn't find the article, so I think I'll leave it on Qiita.

This time I made a job that just converts CSV data to Parquet. First, start the cluster with EMR and run it from the management console. Next, I would like to automate the following.

  1. Start EMR
  2. JOB execution
  3. EMR cluster stop

The source is here. https://github.com/uzresk/spark-samples.git

Create a job to convert CSV to Parquet

It receives the CSV bucket URL and Parquet output destination URL as arguments. Schema is automatically converted, Dataframe is read with header as Yes, and converted to Parquet.

Java

Create a jar file with mvn package.

Csv2Parquet.java


package jp.gr.java_conf.uzresk.samples.spark.jobs;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class Csv2Parquet {

    public static void main(String[] args) {

        String input = args[0];
        String output = args[1];

        SparkSession spark = SparkSession.builder().appName("Csv2ParquetJavaApplication").getOrCreate();
        Dataset<Row> df = spark.read().option("header", "true").option("inferSchema", "true").csv(input);
        df.write().mode(SaveMode.Overwrite).parquet(output);
    }
}

Scala

As a bonus, I wrote it in Scala. Please package with sbt.

Csv2Parquet.scala


package jp.gr.java_conf.uzresk.samples.spark.jobs

import org.apache.spark.sql.{SaveMode, SparkSession}

object Csv2Parquet {
  def main(args: Array[String]) {

    val input = args(0)
    val output = args(1)

    val spark = SparkSession.builder.appName("Csv2ParquetJavaAppliation").getOrCreate
    val df = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(input)
    df.write.mode(SaveMode.Overwrite).parquet(output)
  }
}

Start the EMR cluster and manually submit the JOB

What are some points to worry about when starting a cluster?

    1. The software is just Spark.

image.png

  1. Adjust the number of core nodes according to the size of CSV data, etc.

This time I tried with about 2GB of data. The conversion will be completed in 2 minutes with 2 core nodes.

image.png

    1. Let's set the log

image.png

After pressing the boot button for the cluster, we will add more steps.

    1. Open the Steps tab and press the "Add Step" button.

image.png

  1. I will add steps

image.png

    1. If the cluster is running, the process will immediately move from "pending" to "running".

If it fails, you can check the standard error output by clicking the arrow. Please note that nothing will appear here if you have not set the log.

image.png

Add STEP to the running cluster

Define JOB in StepConfig, specify the cluster ID, and add STEP. Since it is the same as the content set on the screen, I think that there is not much hesitation.

SparkSubmitSingleStepRunningCluster.java


package jp.gr.java_conf.uzresk.samples.spark.emr;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;

import java.util.ArrayList;
import java.util.List;

public class SparkSubmitSingleStepRunningCluster {

    private static final String CLUSTER_ID = "j-1VOWW7554GD9Z";

    public static void main(String[] args) {

        new SparkSubmitSingleStepRunningCluster().startJob();
    }

    private void startJob() {
        ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
        AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
                .withClientConfiguration(cc)
                .withRegion(Regions.AP_NORTHEAST_1)
                .build();

        List<StepConfig> stepConfigs = new ArrayList<>();
        stepConfigs.add(createCsv2ParquetConfig());

        AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(CLUSTER_ID, stepConfigs);
        AddJobFlowStepsResult result = emr.addJobFlowSteps(req);

        System.out.println(result.getStepIds());
    }

    private StepConfig createCsv2ParquetConfig() {
        HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
                .withJar("command-runner.jar")
                .withArgs("spark-submit",
                        "--executor-memory", "1g",
                        "--deploy-mode", "cluster",
                        "--class", Resource.getString("job.class"),
                        Resource.getString("job.jar"),
                        Resource.getString("csv2parquet.csv-file"),
                        Resource.getString("csv2parquet.parquet-path"));
        return new StepConfig()
                .withName("Csv2Parquet")
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(sparkStepConf);
    }
}

Cluster startup → STEP execution → Cluster stop automation

I will write it like this. There are many parameters for starting a cluster of EMR, but other than that, it is the same as the previous example.

SparkSubmitIncludingStartupCluster.java


package jp.gr.java_conf.uzresk.samples.spark.emr;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;

import java.util.ArrayList;
import java.util.List;

import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.CORE;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.MASTER;

public class SparkSubmitIncludingStartupCluster {

    public static void main(String[] args) {
        ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
        AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
                .withClientConfiguration(cc)
                .withRegion(Regions.AP_NORTHEAST_1)
                .build();
        new SparkSubmitIncludingStartupCluster().launch(emr);
    }

    private void launch(AmazonElasticMapReduce emr) {

        RunJobFlowRequest request = createRequest();

        RunJobFlowResult result = emr.runJobFlow(request);

        System.out.println(result.getJobFlowId() + " is starting");

    }

    private RunJobFlowRequest createRequest() {

        return new RunJobFlowRequest()
                .withName(Resource.getString("emr.cluster-name"))
                .withReleaseLabel(Resource.getString("emr.release-label"))
                .withSteps(createStepConfig())
                .withApplications(createApplicationList())
                .withTags(new Tag("Name", Resource.getString("emr.cluster-name")))
                .withEbsRootVolumeSize(Resource.getInt("emr.ebs-root-volume-size"))
                .withServiceRole(Resource.getString("emr.service-role"))
                .withAutoScalingRole(Resource.getString("emr.auto-scaling-role"))
                .withJobFlowRole(Resource.getString("emr.job-flow-role"))
                .withLogUri(Resource.getString("emr.log-uri"))
                .withVisibleToAllUsers(true)
                .withInstances(createJobFlowInstancesConfig(createMasterConfig(), createCoreConfig()));
    }

    private List<StepConfig> createStepConfig() {
        List<StepConfig> configs = new ArrayList<>();
        configs.add(createDebugConfig());
        configs.add(createCsv2ParquetConfig());
        return configs;
    }

    private StepConfig createDebugConfig() {
        String COMMAND_RUNNER = "command-runner.jar";
        String DEBUGGING_COMMAND = "state-pusher-script";
        String DEBUGGING_NAME = "Setup Hadoop Debugging";

        return new StepConfig()
                .withName(DEBUGGING_NAME)
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(
                        new HadoopJarStepConfig().withJar(COMMAND_RUNNER).withArgs(DEBUGGING_COMMAND));
    }

    private StepConfig createCsv2ParquetConfig() {
        HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
                .withJar("command-runner.jar")
                .withArgs("spark-submit",
                        "--executor-memory", "1g",
                        "--deploy-mode", "cluster",
                        "--class", Resource.getString("job.class"),
                        Resource.getString("job.jar"),
                        Resource.getString("csv2parquet.csv-file"),
                        Resource.getString("csv2parquet.parquet-path"));
        return new StepConfig()
                .withName("Csv2Parquet")
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(sparkStepConf);
    }

    private ArrayList<Application> createApplicationList() {
        ArrayList<Application> applications = new ArrayList<>();
        applications.add(new Application().withName("Spark"));
        return applications;
    }

    private InstanceGroupConfig createMasterConfig() {
        return new InstanceGroupConfig()
                .withInstanceCount(Resource.getInt("emr.master.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(MASTER)
                .withInstanceType(Resource.getString("emr.master.instance-type"))
                .withName(Resource.getString("emr.master.name"));
    }

    private InstanceGroupConfig createCoreConfig() {
        return new InstanceGroupConfig()
                .withInstanceCount(Resource.getInt("emr.core.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(CORE)
                .withInstanceType(Resource.getString("emr.core.instance-type"))
                .withName(Resource.getString("emr.core.name"));
    }

    private JobFlowInstancesConfig createJobFlowInstancesConfig(
            InstanceGroupConfig masterConfiguration, InstanceGroupConfig coreConfiguration) {
        return new JobFlowInstancesConfig()
                .withInstanceGroups(masterConfiguration, coreConfiguration)
                .withEc2SubnetId(Resource.getString("emr.subnet-id"))
                .withEmrManagedMasterSecurityGroup(Resource.getString("emr.master-sg"))
                .withEmrManagedSlaveSecurityGroup(Resource.getString("emr.slave-sg"))
                .withEc2KeyName(Resource.getString("emr.key-name"))
                .withKeepJobFlowAliveWhenNoSteps(fales); //Stop the instance at the end of the JOB
    }
}

If you set withKeepJobFlowAliveWhenNoSteps in the last line to false, the cluster will stop automatically after executing JOB.

Use Spot Instances

Batch processing can be executed even cheaper by using Spot Instances. From experience, it will be less than half the price.

I'm using an m4.xlarge on-demand instance for $ 0.258. The price of Spot Instances seemed to be around 0.06 to 0.10, so set the price around $ 0.10. and start it. (Because I wanted to move it surely this time)

When using the SDK, you can move it by adding two lines each for MASTER and CORE.

SparkSubmitIncludingStartupSpotCluster.java


    private InstanceGroupConfig createMasterConfig() {
        return new InstanceGroupConfig()
                .withBidPrice("0.10")          // $0.10
                .withMarket(MarketType.SPOT)   //Use Spot instance
                .withInstanceCount(Resource.getInt("emr.master.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(MASTER)
                .withInstanceType(Resource.getString("emr.master.instance-type"))
                .withName(Resource.getString("emr.master.name"));
    }

    private InstanceGroupConfig createCoreConfig() {
        return new InstanceGroupConfig()
                .withBidPrice("0.10")          // $0.10
                .withMarket(MarketType.SPOT)   //Use Spot instance
                .withInstanceCount(Resource.getInt("emr.core.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(CORE)
                .withInstanceType(Resource.getString("emr.core.instance-type"))
                .withName(Resource.getString("emr.core.name"));
    }

You should be able to confirm that it is running on the Spot Instance by checking the "Summary" tab of EMR.

image.png

Recommended Posts

Try Spark Submit to EMR using AWS SDK for Java
Encrypt data uploaded to S3 using AWS SDK for Java / SSE-KMS
I tried to operate SQS using AWS Java SDK
AWS SDK for Java 1.11.x and 2.x
[Java] Try to implement using generics
Try using Hyperledger Iroha's Java SDK
Upload / download / bulk delete data to S3 using Amazon S3 Client Builder with AWS SDK for Java
Try similar search of Image Search using Java SDK [Search]
[Java] [SQL Server] Connect to local SQL Server 2017 using JDBC for SQL Server
Submit a job to AWS Batch with Java (Eclipse)
Initialization of for Try to make Java problem TypeScript 5-4
Try Image Search's similar search using Java SDK [Registration]
Try connecting to AzureCosmosDB Emulator for Docker with Java
Credentials referenced by the AWS SDK for Java by default
Try to build a Java development environment using Docker
Command to try using Docker for the time being
Set a signed cookie (for CloudFront) with a custom policy using the AWS SDK for Java
Try using RocksDB in Java
Try scraping using java [Notes]
Try using letter_opener_web for inquiries
Investigated how to call services with Watson SDK for Java
Access S3 buckets using SSE-KMS encryption in an EC2 IAM Role environment (AWS SDK for Java)
[Java] Try to solve the Fizz Buzz problem using recursive processing
Three JDKs to consider when using Java for free for commercial use
Try passing values from Java Servlet to iPhone app using JSON
How to make a groundbreaking diamond using Java for statement wwww
Try using Redis with Java (jar)
Using Java with AWS Lambda-Eclipse Preparation
Try to extract java public method
Try running AWS X-Ray in Java
Try to implement Yubaba in Java
Try using IBM Java method tracing
[Java] Where did you try using java?
Get a list of S3 files with ListObjectsV2Request (AWS SDK for Java)
Try using Microsoft Azure SDK For SQL API of Azure Cosmos DB Service 4.6
[AWS SDK for Java] Set a retry policy on the S3 client
Interface Try to make Java problem TypeScript 7-3
Try using Java framework Nablarch [Web application]
Using Java with AWS Lambda-Implementation-Check CloudWatch Arguments
Try to solve Project Euler in Java
Memory measurement for Java apps using jstat
Introduction to java for the first time # 2
Try to implement n-ary addition in Java
Using Java with AWS Lambda-Implementation-Stop / Launch EC2
Try using the Stream API in Java
How to use submit method (Java Silver)
About the procedure for java to work
Study Java Try using Scanner or Map
Try using JSON format API in Java
Connect from Java to MySQL using Eclipse
Try using JobScheduler's REST-API --Java RestClient implementation--
Try managing Java libraries with AWS CodeArtifact
Modern Java environment for Windows using Chocolatey
Memo for migration from java to kotlin
Try using the Wii remote with Java
Try adding text to an image in Scala using the Java standard library
Try calling Watson NLU that seems to support Japanese from the Java SDK
Until you run a Java program with the AWS SDK local to Windows
[Java] How to get HashMap elements by loop control using extended for statement
Try local development of AWS (S3, DynamoDB) with AWS SDK for JavaScript and Docker