Essayez Spark Submit to EMR à l'aide du kit AWS SDK pour Java

introduction

Je voulais démarrer Spark à partir de l'écran du WEB créé en Java, donc cette fois je voudrais soumettre Spark à l'aide du SDK AWS pour Java. Ça y est ...! Je n'ai pas trouvé l'article, alors je pense que je vais le laisser sur Qiita.

Cette fois, j'ai fait un JOB qui convertit simplement les données CSV en Parquet. Tout d'abord, démarrez le cluster avec EMR et exécutez-le à partir de la console de gestion. Ensuite, je voudrais automatiser ce qui suit.

  1. Démarrez EMR
  2. Exécution du JOB
  3. Arrêt du cluster EMR

La source est ici. https://github.com/uzresk/spark-samples.git

Créer un JOB pour convertir CSV en Parquet

Reçoit l'URL du compartiment CSV et l'URL de destination de sortie Parquet en tant qu'arguments. Le schéma est automatiquement converti, Dataframe est lu avec l'en-tête Oui et converti en Parquet.

Java

Créez un fichier jar avec le package mvn.

Csv2Parquet.java


package jp.gr.java_conf.uzresk.samples.spark.jobs;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class Csv2Parquet {

    public static void main(String[] args) {

        String input = args[0];
        String output = args[1];

        SparkSession spark = SparkSession.builder().appName("Csv2ParquetJavaApplication").getOrCreate();
        Dataset<Row> df = spark.read().option("header", "true").option("inferSchema", "true").csv(input);
        df.write().mode(SaveMode.Overwrite).parquet(output);
    }
}

Scala

En prime, je l'ai écrit en Scala. Veuillez emballer avec sbt.

Csv2Parquet.scala


package jp.gr.java_conf.uzresk.samples.spark.jobs

import org.apache.spark.sql.{SaveMode, SparkSession}

object Csv2Parquet {
  def main(args: Array[String]) {

    val input = args(0)
    val output = args(1)

    val spark = SparkSession.builder.appName("Csv2ParquetJavaAppliation").getOrCreate
    val df = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(input)
    df.write.mode(SaveMode.Overwrite).parquet(output)
  }
}

Démarrez un cluster EMR et soumettez manuellement un JOB

Quels sont les points à prendre en compte lors du démarrage d'un cluster?

    1. Le logiciel est juste Spark.

image.png

  1. Ajustez le nombre de nœuds principaux en fonction de la taille des données CSV, etc.

Cette fois, j'ai essayé avec environ 2 Go de données. La conversion sera terminée en 2 minutes avec 2 nœuds principaux.

image.png

    1. Définissons le journal

image.png

Après avoir appuyé sur le bouton du cluster de démarrage, ajoutez d'autres étapes.

    1. Ouvrez l'onglet Steps et appuyez sur le bouton "Add Step".

image.png

  1. Je vais ajouter des étapes

image.png

    1. Si le cluster est en cours d'exécution, le processus passera immédiatement de "en attente" à "en cours d'exécution".

En cas d'échec, vous pouvez vérifier la sortie d'erreur standard en cliquant sur la flèche. Veuillez noter que rien n'apparaîtra ici si vous n'avez pas défini le journal.

image.png

Ajouter STEP au cluster en cours d'exécution

Définissez JOB dans StepConfig, spécifiez l'ID de cluster et ajoutez STEP. Comme c'est le même que le contenu défini à l'écran, je pense qu'il n'y a pas beaucoup d'hésitation.

SparkSubmitSingleStepRunningCluster.java


package jp.gr.java_conf.uzresk.samples.spark.emr;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;

import java.util.ArrayList;
import java.util.List;

public class SparkSubmitSingleStepRunningCluster {

    private static final String CLUSTER_ID = "j-1VOWW7554GD9Z";

    public static void main(String[] args) {

        new SparkSubmitSingleStepRunningCluster().startJob();
    }

    private void startJob() {
        ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
        AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
                .withClientConfiguration(cc)
                .withRegion(Regions.AP_NORTHEAST_1)
                .build();

        List<StepConfig> stepConfigs = new ArrayList<>();
        stepConfigs.add(createCsv2ParquetConfig());

        AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(CLUSTER_ID, stepConfigs);
        AddJobFlowStepsResult result = emr.addJobFlowSteps(req);

        System.out.println(result.getStepIds());
    }

    private StepConfig createCsv2ParquetConfig() {
        HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
                .withJar("command-runner.jar")
                .withArgs("spark-submit",
                        "--executor-memory", "1g",
                        "--deploy-mode", "cluster",
                        "--class", Resource.getString("job.class"),
                        Resource.getString("job.jar"),
                        Resource.getString("csv2parquet.csv-file"),
                        Resource.getString("csv2parquet.parquet-path"));
        return new StepConfig()
                .withName("Csv2Parquet")
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(sparkStepConf);
    }
}

Automatiser le démarrage du cluster → exécution STEP → arrêt du cluster

Je vais l'écrire comme ça. Il existe de nombreux paramètres pour démarrer le cluster d'EMR, mais à part cela, c'est le même que l'exemple précédent.

SparkSubmitIncludingStartupCluster.java


package jp.gr.java_conf.uzresk.samples.spark.emr;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;

import java.util.ArrayList;
import java.util.List;

import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.CORE;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.MASTER;

public class SparkSubmitIncludingStartupCluster {

    public static void main(String[] args) {
        ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
        AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
                .withClientConfiguration(cc)
                .withRegion(Regions.AP_NORTHEAST_1)
                .build();
        new SparkSubmitIncludingStartupCluster().launch(emr);
    }

    private void launch(AmazonElasticMapReduce emr) {

        RunJobFlowRequest request = createRequest();

        RunJobFlowResult result = emr.runJobFlow(request);

        System.out.println(result.getJobFlowId() + " is starting");

    }

    private RunJobFlowRequest createRequest() {

        return new RunJobFlowRequest()
                .withName(Resource.getString("emr.cluster-name"))
                .withReleaseLabel(Resource.getString("emr.release-label"))
                .withSteps(createStepConfig())
                .withApplications(createApplicationList())
                .withTags(new Tag("Name", Resource.getString("emr.cluster-name")))
                .withEbsRootVolumeSize(Resource.getInt("emr.ebs-root-volume-size"))
                .withServiceRole(Resource.getString("emr.service-role"))
                .withAutoScalingRole(Resource.getString("emr.auto-scaling-role"))
                .withJobFlowRole(Resource.getString("emr.job-flow-role"))
                .withLogUri(Resource.getString("emr.log-uri"))
                .withVisibleToAllUsers(true)
                .withInstances(createJobFlowInstancesConfig(createMasterConfig(), createCoreConfig()));
    }

    private List<StepConfig> createStepConfig() {
        List<StepConfig> configs = new ArrayList<>();
        configs.add(createDebugConfig());
        configs.add(createCsv2ParquetConfig());
        return configs;
    }

    private StepConfig createDebugConfig() {
        String COMMAND_RUNNER = "command-runner.jar";
        String DEBUGGING_COMMAND = "state-pusher-script";
        String DEBUGGING_NAME = "Setup Hadoop Debugging";

        return new StepConfig()
                .withName(DEBUGGING_NAME)
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(
                        new HadoopJarStepConfig().withJar(COMMAND_RUNNER).withArgs(DEBUGGING_COMMAND));
    }

    private StepConfig createCsv2ParquetConfig() {
        HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
                .withJar("command-runner.jar")
                .withArgs("spark-submit",
                        "--executor-memory", "1g",
                        "--deploy-mode", "cluster",
                        "--class", Resource.getString("job.class"),
                        Resource.getString("job.jar"),
                        Resource.getString("csv2parquet.csv-file"),
                        Resource.getString("csv2parquet.parquet-path"));
        return new StepConfig()
                .withName("Csv2Parquet")
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(sparkStepConf);
    }

    private ArrayList<Application> createApplicationList() {
        ArrayList<Application> applications = new ArrayList<>();
        applications.add(new Application().withName("Spark"));
        return applications;
    }

    private InstanceGroupConfig createMasterConfig() {
        return new InstanceGroupConfig()
                .withInstanceCount(Resource.getInt("emr.master.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(MASTER)
                .withInstanceType(Resource.getString("emr.master.instance-type"))
                .withName(Resource.getString("emr.master.name"));
    }

    private InstanceGroupConfig createCoreConfig() {
        return new InstanceGroupConfig()
                .withInstanceCount(Resource.getInt("emr.core.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(CORE)
                .withInstanceType(Resource.getString("emr.core.instance-type"))
                .withName(Resource.getString("emr.core.name"));
    }

    private JobFlowInstancesConfig createJobFlowInstancesConfig(
            InstanceGroupConfig masterConfiguration, InstanceGroupConfig coreConfiguration) {
        return new JobFlowInstancesConfig()
                .withInstanceGroups(masterConfiguration, coreConfiguration)
                .withEc2SubnetId(Resource.getString("emr.subnet-id"))
                .withEmrManagedMasterSecurityGroup(Resource.getString("emr.master-sg"))
                .withEmrManagedSlaveSecurityGroup(Resource.getString("emr.slave-sg"))
                .withEc2KeyName(Resource.getString("emr.key-name"))
                .withKeepJobFlowAliveWhenNoSteps(fales); //Arrêtez l'instance à la fin du JOB
    }
}

Si vous définissez withKeepJobFlowAliveWhenNoSteps dans la dernière ligne sur false, le cluster s'arrêtera automatiquement après l'exécution de JOB.

Utiliser des instances ponctuelles

Le traitement par lots peut être exécuté encore moins cher en utilisant des instances ponctuelles. Par expérience, ce sera moins de la moitié du prix.

J'utilise une instance à la demande m4.xlarge pour 0,258 $. Le prix des instances ponctuelles semblait se situer entre 0,06 et 0,10, alors fixez le prix autour de 0,10 USD et démarrez-le. (Parce que je voulais sûrement le déplacer cette fois)

Si vous utilisez SDK, vous pouvez le déplacer en ajoutant 2 lignes chacune pour MASTER et CORE.

SparkSubmitIncludingStartupSpotCluster.java


    private InstanceGroupConfig createMasterConfig() {
        return new InstanceGroupConfig()
                .withBidPrice("0.10")          // $0.10
                .withMarket(MarketType.SPOT)   //Utiliser l'instance Spot
                .withInstanceCount(Resource.getInt("emr.master.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(MASTER)
                .withInstanceType(Resource.getString("emr.master.instance-type"))
                .withName(Resource.getString("emr.master.name"));
    }

    private InstanceGroupConfig createCoreConfig() {
        return new InstanceGroupConfig()
                .withBidPrice("0.10")          // $0.10
                .withMarket(MarketType.SPOT)   //Utiliser l'instance Spot
                .withInstanceCount(Resource.getInt("emr.core.instance-count"))
                .withEbsConfiguration(
                        new EbsConfiguration()
                                .withEbsBlockDeviceConfigs(
                                        new EbsBlockDeviceConfig()
                                                .withVolumeSpecification(
                                                        new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
                                                .withVolumesPerInstance(1))
                                .withEbsOptimized(true))
                .withInstanceRole(CORE)
                .withInstanceType(Resource.getString("emr.core.instance-type"))
                .withName(Resource.getString("emr.core.name"));
    }

Vous devriez être en mesure de confirmer qu'il s'exécute sur l'instance Spot en vérifiant l'onglet "Résumé" d'EMR.

image.png

Recommended Posts

Essayez Spark Submit to EMR à l'aide du kit AWS SDK pour Java
Crypter les données téléchargées vers S3 à l'aide du kit AWS SDK pour Java / SSE-KMS
J'ai essayé de faire fonctionner SQS en utilisant AWS Java SDK
SDK AWS pour Java 1.11.x et 2.x
[Java] Essayez de mettre en œuvre à l'aide de génériques
Essayez d'utiliser le SDK Java d'Hyperledger Iroha
Importer / télécharger / supprimer en bloc des données sur S3 à l'aide d'Amazon S3 Client Builder avec AWS SDK pour Java
Essayez une recherche similaire de recherche d'images à l'aide du SDK Java [Recherche]
[Java] [SQL Server] Se connecter à SQL Server 2017 local à l'aide de JDBC pour SQL Server
Soumettre une tâche à AWS Batch avec Java (Eclipse)
Initialisation de for Essayez de changer le problème Java en TypeScript 5-4
Essayez la recherche similaire de Recherche d'images à l'aide du SDK Java [Inscription]
Essayez de vous connecter à l'émulateur AzureCosmosDB pour Docker avec Java
Informations d'identification référencées par le kit AWS SDK for Java par défaut
Essayez de créer un environnement de développement Java à l'aide de Docker
Commande pour essayer d'utiliser Docker pour le moment
Configurer des cookies signés (pour CloudFront) avec des stratégies personnalisées à l'aide du kit AWS SDK pour Java
Essayez d'utiliser RocksDB avec Java
Essayez de gratter en utilisant Java [Note]
A étudié comment appeler des services avec Watson SDK pour Java
Accéder au compartiment S3 à l'aide du chiffrement SSE-KMS dans l'environnement de rôle IAM EC2 (AWS SDK pour Java)
[Java] Essayez de résoudre le problème de Fizz Buzz en utilisant un traitement récursif
Trois JDK à considérer pour une utilisation commerciale gratuite de Java
Essayez de transmettre des valeurs de Java Servlet à l'application iPhone à l'aide de JSON
Comment faire un diamant révolutionnaire en utilisant Java pour déclaration wwww
Essayez d'utiliser Redis avec Java (jar)
Utilisation de Java avec AWS Lambda-Eclipse Préparation
Essayez d'extraire la méthode publique de java
Essayez d'exécuter AWS X-Ray en Java
Essayez d'implémenter Yuma en Java
Essayez d'utiliser le traçage de méthode IBM Java
[Java] Où avez-vous essayé d'utiliser java
Obtenir une liste de fichiers S3 avec ListObjectsV2Request (AWS SDK for Java)
Essayez d'utiliser l'API Microsoft Azure SDK pour SQL d'Azure Cosmos DB Service 4.6
[AWS SDK for Java] Définir la stratégie de nouvelle tentative pour le client S3
Interface Essayez de créer un problème Java TypeScript 7-3
Essayez d'utiliser le framework Java Nablarch [Application Web]
Utilisation de Java avec des arguments CloudWatch AWS Lambda-Implementation-Check
Essayez de résoudre Project Euler en Java
Mesure de la mémoire pour les applications Java utilisant jstat
Introduction à Java pour la première fois # 2
Essayez d'implémenter l'ajout n-aire en Java
Utilisation de Java avec AWS Lambda-Implementation-Stop / Launch EC2
Essayez d'utiliser l'API Stream en Java
Comment utiliser la méthode de soumission (Java Silver)
À propos de la procédure pour que Java fonctionne
Étude de Java Essayez d'utiliser un scanner ou une carte
Essayez d'utiliser l'API au format JSON en Java
Connectez-vous de Java à MySQL à l'aide d'Eclipse
Essayez d'utiliser l'API REST de JobScheduler - implémentation Java RestClient--
Essayez de gérer les bibliothèques Java avec AWS CodeArtifact
Mise à jour de l'environnement Java de Windows à l'aide de Chocolatey
Mémo pour la migration de Java vers Kotlin
Essayez d'utiliser la télécommande Wii en Java
Essayez d'ajouter du texte à une image avec Scala en utilisant la bibliothèque standard de Java
Essayez d'appeler Watson NLU qui semble prendre en charge le japonais à partir du SDK Java
Jusqu'à ce que vous exécutiez un programme Java avec le SDK AWS local sur Windows
[Java] Comment obtenir des éléments HashMap par contrôle de boucle à l'aide de l'instruction Extended for