[JAVA] SaveAsBinaryFile avec Spark (partie 2)

Contenu de cet article

Dans Article précédent, j'ai écrit comment exporter une chaîne binaire car elle utilise Spark. Il y avait un moyen plus simple que d'exporter à l'aide de mapPartitionsWithIndex. Je ne pouvais pas exporter vers HDFS avec la méthode précédente, Cette fois, je peux exporter vers HDFS et je pense que c'est la "bonne" façon. J'espère que vous pouvez penser à l'article précédent comme à l'utilisation de mapPartitionsWithIndex.

Code Java

Cette fois aussi, nous le ferons en Java.

SaveBinary.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf());

        //Faire RDD d'octets de manière appropriée
        ArrayList<byte[]> bytes = new ArrayList<>();
        for (int i=0; i<10; i++) {
            byte[] b = new byte[10];
            for (int j=0; j<10; j++) {
                b[j] = (byte)(i*10+j);
            }
            bytes.add(b);
        }
        JavaRDD<byte[]> rdd = sc.parallelize(bytes, 2);
        /*Image en RDD
            rdd[0] =  0,  1,  2,  3,  4,  5,  6,  7,  8,  9
            rdd[1] = 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
・ ・ ・
        */
        // byte[]Pour BytesWritable, utilisez NullWritable comme valeur pour rendre JavaPairRDD
       rdd.mapToPair(x->new Tuple2<>(new BytesWritable(x),NullWritable.get()))
          //Spécifiez BytesOutputFormat (self-made) et effectuez saveAsNewAPIHadoopFile
          .saveAsNewAPIHadoopFile("./out", BytesWritable.class, NullWritable.class, BytesOutputFormat.class);
    }
}

BytesOutputFormat.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;

public class BytesOutputFormat extends SequenceFileOutputFormat<BytesWritable,NullWritable> {
    @Override
    public RecordWriter<BytesWritable,NullWritable> getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
        //BytesOutputFormat appelle simplement BytesRecordWriter
        BytesRecordWriter writer = new BytesRecordWriter(job);
        return writer;
    }
}

ByteRecordWriter.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;

public class BytesRecordWriter extends RecordWriter<BytesWritable,NullWritable> {
    private boolean saveToHdfs_ = true;
    private OutputStream os_;
    public BytesRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
        // SaveBinary.La destination de sauvegarde spécifiée par java est stockée ici
        String outDir = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir");
        String taskId = job.getTaskAttemptID().getTaskID().toString();
        //Même si HDFS ou NFS est différent, os_Changez juste la classe
        if (saveToHdfs_) {
            FileSystem hdfs = FileSystem.get(job.getConfiguration());
            os_ = hdfs.create(new Path(outDir + "/part-" + taskId.substring(taskId.length()-6)));
        } else {
            os_ = new FileOutputStream(outDir + "/part-" + taskId.substring(taskId.length()-6));
        }
    }
    @Override
    public void write(BytesWritable key, NullWritable value) throws IOException {
        os_.write(key.getBytes(), 0, key.getLength());
    }

    @Override
    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException {
        os_.close();
        os_ = null;
    }
}

Commentaire

JavaPairRDD a une méthode de sauvegarde Hadoop, donc Enregistrez en utilisant saveAsNewAPIHadoopFile. (Certains d'entre eux n'ont pas de "Nouvelle API", mais je ne comprends pas vraiment la différence. Je pense que vous devriez utiliser la nouvelle.)

Puisque saveAsNewAPIHadoopFile peut spécifier OutputFormat, Après cela, créez un RecordWriter en fonction du format que vous souhaitez enregistrer et spécifiez OutputFormat pour le transmettre.

Cette zone peut prêter à confusion pour ceux qui n'ont jamais implémenté le format de sortie de Hadoop. Si vous regardez le code ci-dessus, vous saurez tout de suite ce que vous faites. Vous venez de créer un OutputStream et la méthode write est appelée pour chaque élément.

Résultat de sortie

Je vais montrer le résultat de sortie pour le moment. Cette fois, j'ai volontairement défini la partition sur 2 et l'ai exécutée, il y a donc deux fichiers de sortie.

out/part-000000


00 01 02 03 04 05 06 07  08 09 0A 0B 0C 0D 0E 0F
10 11 12 13 14 15 16 17  18 19 1A 1B 1C 1D 1E 1F
20 21 22 23 24 25 26 27  28 29 2A 2B 2C 2D 2E 2F
30 31

out/part-000001


32 33 34 35 36 37 38 39  3A 3B 3C 3D 3E 3F 40 41
42 43 44 45 46 47 48 49  4A 4B 4C 4D 4E 4F 50 51
52 53 54 55 56 57 58 59  5A 5B 5C 5D 5E 5F 60 61
62 63

Vous pouvez voir qu'il a été produit sans aucune donnée supplémentaire. En passant, puisque les données sont continues à «part-000000» et «part-000001», Si vous «cat» ces fichiers, vous obtiendrez le même fichier qui a été produit comme une partition.

Recommended Posts

SaveAsBinaryFile avec Spark (partie 2)
SaveAsBinaryFile dans Spark
Micro service facile avec Spark Framework!
Cartographie Bean avec MapStruct Partie 3
Cartographie de Bean avec MapStruct Partie 2
Java pour apprendre avec les ramen [Partie 1]
Test de l'API REST à l'aide de REST Assured Part 2
Liaison de données avec Spark et Cassandra
Traitement serveur avec Java (Introduction partie 1)
Réintroduction aux opérateurs avec RxJava Partie 1
Introduction à Java à partir de 0 Partie 1
AWS Lambda (Lambda) Partie 1 avec Java pour démarrer maintenant
Extraire une partie d'une chaîne en Ruby
Installer Docker avec WSL2 Memo ([Partie 2] Introduction à Docker)