Versuchen Sie Spark Submit to EMR mit AWS SDK für Java

Einführung

Ich wollte Spark vom Bildschirm des in Java erstellten WEB aus starten, daher möchte ich dieses Mal Spark mit dem AWS SDK für Java einreichen. Das ist es! Ich konnte den Artikel nicht finden, also werde ich ihn auf Qiita belassen.

Diesmal habe ich einen Job gemacht, der nur CSV-Daten in Parkett konvertiert. Starten Sie den Cluster zunächst mit EMR und führen Sie ihn über die Verwaltungskonsole aus. Als nächstes möchte ich Folgendes automatisieren.

  1. Starten Sie EMR
  2. JOB-Ausführung
  3. EMR-Cluster stoppen

Die Quelle ist hier. https://github.com/uzresk/spark-samples.git

Erstellen Sie einen Job, um CSV in Parkett umzuwandeln

Empfängt die CSV-Bucket-URL und die Parkett-Ausgabeziel-URL als Argumente. Das Schema wird automatisch konvertiert, der Datenrahmen wird mit dem Header "Ja" gelesen und in Parkett konvertiert.

Java

Erstellen Sie eine JAR-Datei mit dem MVN-Paket.

Csv2Parquet.java


package jp.gr.java_conf.uzresk.samples.spark.jobs;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class Csv2Parquet {

    public static void main(String[] args) {

        String input = args[0];
        String output = args[1];

        SparkSession spark = SparkSession.builder().appName("Csv2ParquetJavaApplication").getOrCreate();
        Dataset<Row> df = spark.read().option("header", "true").option("inferSchema", "true").csv(input);
        df.write().mode(SaveMode.Overwrite).parquet(output);
    }
}

Scala

Als Bonus habe ich es in Scala geschrieben. Bitte mit sbt verpacken.

Csv2Parquet.scala


package jp.gr.java_conf.uzresk.samples.spark.jobs

import org.apache.spark.sql.{SaveMode, SparkSession}

object Csv2Parquet {
  def main(args: Array[String]) {

    val input = args(0)
    val output = args(1)

    val spark = SparkSession.builder.appName("Csv2ParquetJavaAppliation").getOrCreate
    val df = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(input)
    df.write.mode(SaveMode.Overwrite).parquet(output)
  }
}

Starten Sie einen EMR-Cluster und senden Sie manuell einen Job

Worüber sollten Sie sich beim Starten eines Clusters Gedanken machen?

    1. Die Software ist nur Spark.

image.png

  1. Passen Sie die Anzahl der Kernknoten an die Größe der CSV-Daten usw. an.

Diesmal habe ich es mit ca. 2GB Daten versucht. Die Konvertierung wird in 2 Minuten mit 2 Kernknoten abgeschlossen.

image.png

    1. Lassen Sie uns das Protokoll einstellen

image.png

Fügen Sie nach dem Drücken der Bootcluster-Schaltfläche weitere Schritte hinzu.

    1. Öffnen Sie die Registerkarte Schritte und klicken Sie auf die Schaltfläche "Schritt hinzufügen".

image.png

  1. Ich werde Schritte hinzufügen

image.png

    1. Wenn der Cluster ausgeführt wird, wechselt der Prozess sofort von "ausstehend" zu "ausgeführt".

Wenn dies fehlschlägt, können Sie die Standardfehlerausgabe überprüfen, indem Sie auf den Pfeil klicken. Bitte beachten Sie, dass hier nichts angezeigt wird, wenn Sie das Protokoll nicht festgelegt haben.

image.png

Fügen Sie dem laufenden Cluster STEP hinzu

Definieren Sie JOB in StepConfig, geben Sie die Cluster-ID an und fügen Sie STEP hinzu. Da es mit dem auf dem Bildschirm eingestellten Inhalt identisch ist, gibt es meines Erachtens nicht viel Zögern.

SparkSubmitSingleStepRunningCluster.java


package jp.gr.java_conf.uzresk.samples.spark.emr;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;

import java.util.ArrayList;
import java.util.List;

public class SparkSubmitSingleStepRunningCluster {

    private static final String CLUSTER_ID = "j-1VOWW7554GD9Z";

    public static void main(String[] args) {

        new SparkSubmitSingleStepRunningCluster().startJob();
    }

    private void startJob() {
        ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
        AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
                .withClientConfiguration(cc)
                .withRegion(Regions.AP_NORTHEAST_1)
                .build();

        List<StepConfig> stepConfigs = new ArrayList<>();
        stepConfigs.add(createCsv2ParquetConfig());

        AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(CLUSTER_ID, stepConfigs);
        AddJobFlowStepsResult result = emr.addJobFlowSteps(req);

        System.out.println(result.getStepIds());
    }

    private StepConfig createCsv2ParquetConfig() {
        HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
                .withJar("command-runner.jar")
                .withArgs("spark-submit",
                        "--executor-memory", "1g",
                        "--deploy-mode", "cluster",
                        "--class", Resource.getString("job.class"),
                        Resource.getString("job.jar"),
                        Resource.getString("csv2parquet.csv-file"),
                        Resource.getString("csv2parquet.parquet-path"));
        return new StepConfig()
                .withName("Csv2Parquet")
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(sparkStepConf);
    }
}

Clusterstart automatisieren → STEP-Ausführung → Clusterstopp

Ich werde es so schreiben. Es gibt viele Parameter zum Starten des EMR-Clusters, aber ansonsten ist es dasselbe wie im vorherigen Beispiel.

SparkSubmitIncludingStartupCluster.java


package jp.gr.java_conf.uzresk.samples.spark.emr;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;

import java.util.ArrayList;
import java.util.List;

import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.CORE;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.MASTER;

public class SparkSubmitIncludingStartupCluster {

    public static void main(String[] args) {
        ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
        AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
                .withClientConfiguration(cc)
                .withRegion(Regions.AP_NORTHEAST_1)
                .build();
        new SparkSubmitIncludingStartupCluster().launch(emr);
    }

    private void launch(AmazonElasticMapReduce emr) {

        RunJobFlowRequest request = createRequest();

        RunJobFlowResult result = emr.runJobFlow(request);

        System.out.println(result.getJobFlowId() + " is starting");

    }

    private RunJobFlowRequest createRequest() {

        return new RunJobFlowRequest()
                .withName(Resource.getString("emr.cluster-name"))
                .withReleaseLabel(Resource.getString("emr.release-label"))
                .withSteps(createStepConfig())
                .withApplications(createApplicationList())
                .withTags(new Tag("Name", Resource.getString("emr.cluster-name")))
                .withEbsRootVolumeSize(Resource.getInt("emr.ebs-root-volume-size"))
                .withServiceRole(Resource.getString("emr.service-role"))
                .withAutoScalingRole(Resource.getString("emr.auto-scaling-role"))
                .withJobFlowRole(Resource.getString("emr.job-flow-role"))
                .withLogUri(Resource.getString("emr.log-uri"))
                .withVisibleToAllUsers(true)
                .withInstances(createJobFlowInstancesConfig(createMasterConfig(), createCoreConfig()));
    }

    private List<StepConfig> createStepConfig() {
        List<StepConfig> configs = new ArrayList<>();
        configs.add(createDebugConfig());
        configs.add(createCsv2ParquetConfig());
        return configs;
    }

    private StepConfig createDebugConfig() {
        String COMMAND_RUNNER = "command-runner.jar";
        String DEBUGGING_COMMAND = "state-pusher-script";
        String DEBUGGING_NAME = "Setup Hadoop Debugging";

        return new StepConfig()
                .withName(DEBUGGING_NAME)
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(
                        new HadoopJarStepConfig().withJar(COMMAND_RUNNER).withArgs(DEBUGGING_COMMAND));
    }

    private StepConfig createCsv2ParquetConfig() {
        HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
                .withJar("command-runner.jar")
                .withArgs("spark-submit",
                        "--executor-memory", "1g",
                        "--deploy-mode", "cluster",
                        "--class", Resource.getString("job.class"),
                        Resource.getString("job.jar"),
                        Resource.getString("csv2parquet.csv-file"),
                        Resource.getString("csv2parquet.parquet-path"));
        return new StepConfig()
                .withName("Csv2Parquet")
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(sparkStepConf);
    }

    private ArrayList<Application> createApplicationList() {
        ArrayList<Application> applications = new ArrayList<>();
        applications.add(new Application().withName("Spark"));
        return applications;
    }

    private InstanceGroupConfig createMasterConfig() {
        return new InstanceGroupConfig()
                .withInstanceCount(Resource.getInt("emr.master.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(MASTER)
                .withInstanceType(Resource.getString("emr.master.instance-type"))
                .withName(Resource.getString("emr.master.name"));
    }

    private InstanceGroupConfig createCoreConfig() {
        return new InstanceGroupConfig()
                .withInstanceCount(Resource.getInt("emr.core.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(CORE)
                .withInstanceType(Resource.getString("emr.core.instance-type"))
                .withName(Resource.getString("emr.core.name"));
    }

    private JobFlowInstancesConfig createJobFlowInstancesConfig(
            InstanceGroupConfig masterConfiguration, InstanceGroupConfig coreConfiguration) {
        return new JobFlowInstancesConfig()
                .withInstanceGroups(masterConfiguration, coreConfiguration)
                .withEc2SubnetId(Resource.getString("emr.subnet-id"))
                .withEmrManagedMasterSecurityGroup(Resource.getString("emr.master-sg"))
                .withEmrManagedSlaveSecurityGroup(Resource.getString("emr.slave-sg"))
                .withEc2KeyName(Resource.getString("emr.key-name"))
                .withKeepJobFlowAliveWhenNoSteps(fales); //Stoppen Sie die Instanz am Ende des Jobs
    }
}

Wenn Sie in der letzten Zeile withKeepJobFlowAliveWhenNoSteps auf false setzen, wird der Cluster nach Ausführung von JOB automatisch gestoppt.

Verwenden Sie Spot-Instanzen

Die Stapelverarbeitung kann mithilfe von Spot-Instanzen noch kostengünstiger ausgeführt werden. Aus Erfahrung wird es weniger als die Hälfte des Preises sein.

Ich verwende eine m4.xlarge-On-Demand-Instanz für 0,258 US-Dollar. Der Preis für Spot-Instanzen schien zwischen 0,06 und 0,10 zu liegen. Stellen Sie den Preis also auf 0,10 USD ein und starten Sie ihn. (Weil ich es diesmal sicher bewegen wollte)

Wenn Sie SDK verwenden, können Sie es verschieben, indem Sie jeweils 2 Zeilen für MASTER und CORE hinzufügen.

SparkSubmitIncludingStartupSpotCluster.java


    private InstanceGroupConfig createMasterConfig() {
        return new InstanceGroupConfig()
                .withBidPrice("0.10")          // $0.10
                .withMarket(MarketType.SPOT)   //Verwenden Sie die Spot-Instanz
                .withInstanceCount(Resource.getInt("emr.master.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(MASTER)
                .withInstanceType(Resource.getString("emr.master.instance-type"))
                .withName(Resource.getString("emr.master.name"));
    }

    private InstanceGroupConfig createCoreConfig() {
        return new InstanceGroupConfig()
                .withBidPrice("0.10")          // $0.10
                .withMarket(MarketType.SPOT)   //Verwenden Sie die Spot-Instanz
                .withInstanceCount(Resource.getInt("emr.core.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(CORE)
                .withInstanceType(Resource.getString("emr.core.instance-type"))
                .withName(Resource.getString("emr.core.name"));
    }

Sie sollten in der Lage sein, zu bestätigen, dass es auf der Spot-Instanz ausgeführt wird, indem Sie die Registerkarte "Zusammenfassung" von EMR überprüfen.

image.png

Recommended Posts

Versuchen Sie Spark Submit to EMR mit AWS SDK für Java
Verschlüsseln Sie Daten, die mit AWS SDK für Java / SSE-KMS in S3 hochgeladen wurden
Ich habe versucht, SQS mit AWS Java SDK zu betreiben
AWS SDK für Java 1.11.x und 2.x.
[Java] Versuchen Sie, mithilfe von Generika zu implementieren
Versuchen Sie es mit dem Java SDK von Hyperledger Iroha
Hochladen / Herunterladen / Löschen von Daten in S3 mithilfe von Amazon S3 Client Builder mit AWS SDK für Java
Versuchen Sie eine ähnliche Suche in der Bildsuche mit dem Java SDK [Suche]
[Java] [SQL Server] Stellen Sie mit JDBC für SQL Server eine Verbindung zum lokalen SQL Server 2017 her
Senden Sie einen Job an AWS Batch mit Java (Eclipse)
Initialisierung von for Versuchen Sie, das Java-Problem in TypeScript 5-4 zu ändern
Probieren Sie die ähnliche Suche von Image Search mit Java SDK [Registrierung] aus.
Versuchen Sie, mit Java eine Verbindung zu AzureCosmosDB Emulator for Docker herzustellen
Anmeldeinformationen, auf die das AWS SDK für Java standardmäßig verweist
Versuchen Sie, mit Docker eine Java-Entwicklungsumgebung zu erstellen
Befehl, um Docker vorerst zu verwenden
Richten Sie signierte Cookies (für CloudFront) mit benutzerdefinierten Richtlinien mithilfe des AWS SDK für Java ein
Versuchen Sie es mit RocksDB mit Java
Versuchen Sie, mit Java zu kratzen [Hinweis]
Untersucht, wie Dienste mit Watson SDK für Java aufgerufen werden
Zugriff auf den S3-Bucket mithilfe der SSE-KMS-Verschlüsselung in der EC2-IAM-Rollenumgebung (AWS SDK für Java)
[Java] Versuchen Sie, das Fizz Buzz-Problem mithilfe der rekursiven Verarbeitung zu lösen
Drei JDKs für die kostenlose kommerzielle Nutzung von Java
Versuchen Sie, Werte von Java Servlet mit JSON an die iPhone-App zu übergeben
Wie man einen revolutionären Diamanten mit Java für Aussage macht wwww
Versuchen Sie es mit Redis mit Java (jar)
Verwenden von Java mit AWS Lambda-Eclipse-Vorbereitung
Versuchen Sie, die öffentliche Java-Methode zu extrahieren
Versuchen Sie, AWS X-Ray in Java auszuführen
Versuchen Sie, Yuma in Java zu implementieren
Versuchen Sie es mit der IBM Java-Methodenverfolgung
[Java] Wo haben Sie versucht, Java zu verwenden?
Holen Sie sich eine Liste der S3-Dateien mit ListObjectsV2Request (AWS SDK für Java)
Versuchen Sie, die Microsoft Azure SDK für SQL-API von Azure Cosmos DB Service 4.6 zu verwenden
[AWS SDK für Java] Legen Sie die Wiederholungsrichtlinie für den S3-Client fest
Schnittstelle Versuchen Sie, Java-Problem TypeScript 7-3 zu machen
Versuchen Sie es mit dem Java Framework Nablarch [Web Application]
Verwenden von Java mit AWS Lambda-Implementation-Check CloudWatch-Argumenten
Versuchen Sie, Project Euler in Java zu lösen
Speichermessung für Java-Apps mit jstat
Einführung in Java zum ersten Mal # 2
Versuchen Sie, n-ary Addition in Java zu implementieren
Verwenden von Java mit AWS Lambda-Implementierung-Stop / Launch EC2
Versuchen Sie es mit der Stream-API in Java
Verwendung der Submit-Methode (Java Silver)
Über das Verfahren, damit Java funktioniert
Java lernen Versuchen Sie es mit einem Scanner oder einer Karte
Versuchen Sie es mit der JSON-Format-API in Java
Stellen Sie mit Eclipse eine Verbindung von Java zu MySQL her
Versuchen Sie es mit der REST-API von JobScheduler - Java RestClient-Implementierung -
Versuchen Sie, Java-Bibliotheken mit AWS CodeArtifact zu verwalten
Aktualisieren der Java-Umgebung von Windows mit Chocolatey
Memo für die Migration von Java nach Kotlin
Versuchen Sie es mit der Wii-Fernbedienung in Java
Versuchen Sie, mit Scala mithilfe der Standardbibliothek von Java Text zu einem Bild hinzuzufügen
Versuchen Sie, Watson NLU, die Japanisch zu unterstützen scheint, vom Java SDK aus aufzurufen
Bis Sie ein Java-Programm mit dem für Windows lokalen AWS SDK ausführen
[Java] So erhalten Sie HashMap-Elemente per Schleifensteuerung mithilfe der erweiterten for-Anweisung