Je voulais démarrer Spark à partir de l'écran du WEB créé en Java, donc cette fois je voudrais soumettre Spark à l'aide du SDK AWS pour Java. Ça y est ...! Je n'ai pas trouvé l'article, alors je pense que je vais le laisser sur Qiita.
Cette fois, j'ai fait un JOB qui convertit simplement les données CSV en Parquet. Tout d'abord, démarrez le cluster avec EMR et exécutez-le à partir de la console de gestion. Ensuite, je voudrais automatiser ce qui suit.
La source est ici. https://github.com/uzresk/spark-samples.git
Reçoit l'URL du compartiment CSV et l'URL de destination de sortie Parquet en tant qu'arguments. Le schéma est automatiquement converti, Dataframe est lu avec l'en-tête Oui et converti en Parquet.
Java
Créez un fichier jar avec le package mvn.
Csv2Parquet.java
package jp.gr.java_conf.uzresk.samples.spark.jobs;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class Csv2Parquet {
public static void main(String[] args) {
String input = args[0];
String output = args[1];
SparkSession spark = SparkSession.builder().appName("Csv2ParquetJavaApplication").getOrCreate();
Dataset<Row> df = spark.read().option("header", "true").option("inferSchema", "true").csv(input);
df.write().mode(SaveMode.Overwrite).parquet(output);
}
}
Scala
En prime, je l'ai écrit en Scala. Veuillez emballer avec sbt.
Csv2Parquet.scala
package jp.gr.java_conf.uzresk.samples.spark.jobs
import org.apache.spark.sql.{SaveMode, SparkSession}
object Csv2Parquet {
def main(args: Array[String]) {
val input = args(0)
val output = args(1)
val spark = SparkSession.builder.appName("Csv2ParquetJavaAppliation").getOrCreate
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(input)
df.write.mode(SaveMode.Overwrite).parquet(output)
}
}
Quels sont les points à prendre en compte lors du démarrage d'un cluster?
Cette fois, j'ai essayé avec environ 2 Go de données. La conversion sera terminée en 2 minutes avec 2 nœuds principaux.
Après avoir appuyé sur le bouton du cluster de démarrage, ajoutez d'autres étapes.
En cas d'échec, vous pouvez vérifier la sortie d'erreur standard en cliquant sur la flèche. Veuillez noter que rien n'apparaîtra ici si vous n'avez pas défini le journal.
Définissez JOB dans StepConfig, spécifiez l'ID de cluster et ajoutez STEP. Comme c'est le même que le contenu défini à l'écran, je pense qu'il n'y a pas beaucoup d'hésitation.
SparkSubmitSingleStepRunningCluster.java
package jp.gr.java_conf.uzresk.samples.spark.emr;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;
import java.util.ArrayList;
import java.util.List;
public class SparkSubmitSingleStepRunningCluster {
private static final String CLUSTER_ID = "j-1VOWW7554GD9Z";
public static void main(String[] args) {
new SparkSubmitSingleStepRunningCluster().startJob();
}
private void startJob() {
ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withClientConfiguration(cc)
.withRegion(Regions.AP_NORTHEAST_1)
.build();
List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(createCsv2ParquetConfig());
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(CLUSTER_ID, stepConfigs);
AddJobFlowStepsResult result = emr.addJobFlowSteps(req);
System.out.println(result.getStepIds());
}
private StepConfig createCsv2ParquetConfig() {
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--executor-memory", "1g",
"--deploy-mode", "cluster",
"--class", Resource.getString("job.class"),
Resource.getString("job.jar"),
Resource.getString("csv2parquet.csv-file"),
Resource.getString("csv2parquet.parquet-path"));
return new StepConfig()
.withName("Csv2Parquet")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(sparkStepConf);
}
}
Je vais l'écrire comme ça. Il existe de nombreux paramètres pour démarrer le cluster d'EMR, mais à part cela, c'est le même que l'exemple précédent.
SparkSubmitIncludingStartupCluster.java
package jp.gr.java_conf.uzresk.samples.spark.emr;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;
import java.util.ArrayList;
import java.util.List;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.CORE;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.MASTER;
public class SparkSubmitIncludingStartupCluster {
public static void main(String[] args) {
ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withClientConfiguration(cc)
.withRegion(Regions.AP_NORTHEAST_1)
.build();
new SparkSubmitIncludingStartupCluster().launch(emr);
}
private void launch(AmazonElasticMapReduce emr) {
RunJobFlowRequest request = createRequest();
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println(result.getJobFlowId() + " is starting");
}
private RunJobFlowRequest createRequest() {
return new RunJobFlowRequest()
.withName(Resource.getString("emr.cluster-name"))
.withReleaseLabel(Resource.getString("emr.release-label"))
.withSteps(createStepConfig())
.withApplications(createApplicationList())
.withTags(new Tag("Name", Resource.getString("emr.cluster-name")))
.withEbsRootVolumeSize(Resource.getInt("emr.ebs-root-volume-size"))
.withServiceRole(Resource.getString("emr.service-role"))
.withAutoScalingRole(Resource.getString("emr.auto-scaling-role"))
.withJobFlowRole(Resource.getString("emr.job-flow-role"))
.withLogUri(Resource.getString("emr.log-uri"))
.withVisibleToAllUsers(true)
.withInstances(createJobFlowInstancesConfig(createMasterConfig(), createCoreConfig()));
}
private List<StepConfig> createStepConfig() {
List<StepConfig> configs = new ArrayList<>();
configs.add(createDebugConfig());
configs.add(createCsv2ParquetConfig());
return configs;
}
private StepConfig createDebugConfig() {
String COMMAND_RUNNER = "command-runner.jar";
String DEBUGGING_COMMAND = "state-pusher-script";
String DEBUGGING_NAME = "Setup Hadoop Debugging";
return new StepConfig()
.withName(DEBUGGING_NAME)
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(
new HadoopJarStepConfig().withJar(COMMAND_RUNNER).withArgs(DEBUGGING_COMMAND));
}
private StepConfig createCsv2ParquetConfig() {
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--executor-memory", "1g",
"--deploy-mode", "cluster",
"--class", Resource.getString("job.class"),
Resource.getString("job.jar"),
Resource.getString("csv2parquet.csv-file"),
Resource.getString("csv2parquet.parquet-path"));
return new StepConfig()
.withName("Csv2Parquet")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(sparkStepConf);
}
private ArrayList<Application> createApplicationList() {
ArrayList<Application> applications = new ArrayList<>();
applications.add(new Application().withName("Spark"));
return applications;
}
private InstanceGroupConfig createMasterConfig() {
return new InstanceGroupConfig()
.withInstanceCount(Resource.getInt("emr.master.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(MASTER)
.withInstanceType(Resource.getString("emr.master.instance-type"))
.withName(Resource.getString("emr.master.name"));
}
private InstanceGroupConfig createCoreConfig() {
return new InstanceGroupConfig()
.withInstanceCount(Resource.getInt("emr.core.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(CORE)
.withInstanceType(Resource.getString("emr.core.instance-type"))
.withName(Resource.getString("emr.core.name"));
}
private JobFlowInstancesConfig createJobFlowInstancesConfig(
InstanceGroupConfig masterConfiguration, InstanceGroupConfig coreConfiguration) {
return new JobFlowInstancesConfig()
.withInstanceGroups(masterConfiguration, coreConfiguration)
.withEc2SubnetId(Resource.getString("emr.subnet-id"))
.withEmrManagedMasterSecurityGroup(Resource.getString("emr.master-sg"))
.withEmrManagedSlaveSecurityGroup(Resource.getString("emr.slave-sg"))
.withEc2KeyName(Resource.getString("emr.key-name"))
.withKeepJobFlowAliveWhenNoSteps(fales); //Arrêtez l'instance à la fin du JOB
}
}
Si vous définissez withKeepJobFlowAliveWhenNoSteps dans la dernière ligne sur false, le cluster s'arrêtera automatiquement après l'exécution de JOB.
Le traitement par lots peut être exécuté encore moins cher en utilisant des instances ponctuelles. Par expérience, ce sera moins de la moitié du prix.
J'utilise une instance à la demande m4.xlarge pour 0,258 $. Le prix des instances ponctuelles semblait se situer entre 0,06 et 0,10, alors fixez le prix autour de 0,10 USD et démarrez-le. (Parce que je voulais sûrement le déplacer cette fois)
Si vous utilisez SDK, vous pouvez le déplacer en ajoutant 2 lignes chacune pour MASTER et CORE.
SparkSubmitIncludingStartupSpotCluster.java
private InstanceGroupConfig createMasterConfig() {
return new InstanceGroupConfig()
.withBidPrice("0.10") // $0.10
.withMarket(MarketType.SPOT) //Utiliser l'instance Spot
.withInstanceCount(Resource.getInt("emr.master.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(MASTER)
.withInstanceType(Resource.getString("emr.master.instance-type"))
.withName(Resource.getString("emr.master.name"));
}
private InstanceGroupConfig createCoreConfig() {
return new InstanceGroupConfig()
.withBidPrice("0.10") // $0.10
.withMarket(MarketType.SPOT) //Utiliser l'instance Spot
.withInstanceCount(Resource.getInt("emr.core.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(CORE)
.withInstanceType(Resource.getString("emr.core.instance-type"))
.withName(Resource.getString("emr.core.name"));
}
Vous devriez être en mesure de confirmer qu'il s'exécute sur l'instance Spot en vérifiant l'onglet "Résumé" d'EMR.
Recommended Posts