[JAVA] SaveAsBinaryFile mit Spark (Teil 2)

Inhalt dieses Artikels

In Vorheriger Artikel habe ich geschrieben, wie eine Binärzeichenfolge exportiert wird, während Spark verwendet wird. Es gab einen einfacheren Weg als den Export mit mapPartitionsWithIndex. Ich konnte mit der vorherigen Methode nicht nach HDFS exportieren. Dieses Mal kann ich nach HDFS exportieren und ich denke, es ist der "richtige" Weg. Ich hoffe, Sie können sich den vorherigen Artikel als Verwendung von mapPartitionsWithIndex vorstellen.

Java-Code

Auch diesmal werden wir es in Java tun.

SaveBinary.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf());

        //Machen Sie RDD von Bytes entsprechend
        ArrayList<byte[]> bytes = new ArrayList<>();
        for (int i=0; i<10; i++) {
            byte[] b = new byte[10];
            for (int j=0; j<10; j++) {
                b[j] = (byte)(i*10+j);
            }
            bytes.add(b);
        }
        JavaRDD<byte[]> rdd = sc.parallelize(bytes, 2);
        /*Bild in RDD
            rdd[0] =  0,  1,  2,  3,  4,  5,  6,  7,  8,  9
            rdd[1] = 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
・ ・ ・
        */
        // byte[]Verwenden Sie für BytesWritable NullWritable als Wert, um JavaPairRDD zu erstellen
       rdd.mapToPair(x->new Tuple2<>(new BytesWritable(x),NullWritable.get()))
          //Geben Sie BytesOutputFormat (selbst erstellt) an und führen Sie saveAsNewAPIHadoopFile aus
          .saveAsNewAPIHadoopFile("./out", BytesWritable.class, NullWritable.class, BytesOutputFormat.class);
    }
}

BytesOutputFormat.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;

public class BytesOutputFormat extends SequenceFileOutputFormat<BytesWritable,NullWritable> {
    @Override
    public RecordWriter<BytesWritable,NullWritable> getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
        //BytesOutputFormat ruft nur BytesRecordWriter auf
        BytesRecordWriter writer = new BytesRecordWriter(job);
        return writer;
    }
}

ByteRecordWriter.java


import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;

public class BytesRecordWriter extends RecordWriter<BytesWritable,NullWritable> {
    private boolean saveToHdfs_ = true;
    private OutputStream os_;
    public BytesRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
        // SaveBinary.Das von Java angegebene Speicherziel wird hier gespeichert
        String outDir = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir");
        String taskId = job.getTaskAttemptID().getTaskID().toString();
        //Auch wenn HDFS oder NFS unterschiedlich ist, os_Ändern Sie einfach die Klasse
        if (saveToHdfs_) {
            FileSystem hdfs = FileSystem.get(job.getConfiguration());
            os_ = hdfs.create(new Path(outDir + "/part-" + taskId.substring(taskId.length()-6)));
        } else {
            os_ = new FileOutputStream(outDir + "/part-" + taskId.substring(taskId.length()-6));
        }
    }
    @Override
    public void write(BytesWritable key, NullWritable value) throws IOException {
        os_.write(key.getBytes(), 0, key.getLength());
    }

    @Override
    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException {
        os_.close();
        os_ = null;
    }
}

Kommentar

JavaPairRDD verfügt also über eine Hadoop-Speichermethode Speichern Sie mit saveAsNewAPIHadoopFile. (Einige von ihnen haben keine "Neue API", aber ich verstehe den Unterschied nicht wirklich. Ich denke, Sie sollten die neue verwenden.)

Da saveAsNewAPIHadoopFile OutputFormat angeben kann, Erstellen Sie anschließend einen RecordWriter gemäß dem Format, das Sie speichern möchten, und geben Sie das OutputFormat an, das übergeben werden soll.

Dieser Bereich kann für diejenigen verwirrend sein, die das Ausgabeformat von Hadoop noch nie implementiert haben. Wenn Sie sich den obigen Code ansehen, wissen Sie sofort, was Sie tun. Sie haben gerade einen OutputStream erstellt und die Methode "write" wird für jedes Element aufgerufen.

Ausgabeergebnis

Ich werde das Ausgabeergebnis vorerst zeigen. Dieses Mal habe ich die Partition absichtlich auf 2 gesetzt und ausgeführt, sodass es zwei Ausgabedateien gibt.

out/part-000000


00 01 02 03 04 05 06 07  08 09 0A 0B 0C 0D 0E 0F
10 11 12 13 14 15 16 17  18 19 1A 1B 1C 1D 1E 1F
20 21 22 23 24 25 26 27  28 29 2A 2B 2C 2D 2E 2F
30 31

out/part-000001


32 33 34 35 36 37 38 39  3A 3B 3C 3D 3E 3F 40 41
42 43 44 45 46 47 48 49  4A 4B 4C 4D 4E 4F 50 51
52 53 54 55 56 57 58 59  5A 5B 5C 5D 5E 5F 60 61
62 63

Sie können sehen, dass es ohne zusätzliche Daten ausgegeben wurde. Abgesehen davon, da die Daten bei "Teil-000000" und "Teil-000001" kontinuierlich sind, Wenn Sie diese Dateien "katzen", erhalten Sie dieselbe Datei, die als 1 Partition ausgegeben wurde.

Recommended Posts

SaveAsBinaryFile mit Spark (Teil 2)
SaveAsBinaryFile in Spark
Einfacher Mikroservice mit Spark Framework!
Bean-Mapping mit MapStruct Teil 3
Bean-Mapping mit MapStruct Teil 2
Java mit Ramen lernen [Teil 1]
REST-API-Test mit REST Assured Part 2
Datenverknüpfung mit Spark und Cassandra
Serververarbeitung mit Java (Einführung Teil.1)
Wiedereinführung in Betreiber mit RxJava Teil 1
Einführung in Java ab 0 Teil 1
AWS Lambda (Lambda) Teil 1 mit Java startet jetzt
Extrahieren Sie einen Teil einer Zeichenfolge in Ruby
Installieren Sie Docker mit WSL2 Memo ([Teil 2] Docker-Einführung)