[JAVA] SaveAsBinaryFile in Spark

Inhalt dieses Artikels

** 17.06.2018 Nachtrag ** Ich habe Artikel auf dem richtigen Weg gemäß der Spark-API geschrieben. Beziehen Sie sich für den Zweck dieses Titels grundsätzlich darauf. Dieser Artikel kann als Beispiel für die Verwendung von mapPartitionsWithIndex verwendet werden.

So exportieren Sie Daten für jede Partition in Spark Ich habe ein saveAsTextFile, aber ich habe keinsaveAsBinaryFile. Es gibt ein saveAsObjectFile, das jedoch im SequenceFile-Format für Hadoop endet. Es scheint keine API zu geben, um binäre Informationen so wie sie sind zu schreiben, also suchte ich nach einer Möglichkeit, dies zu tun. Ich habe getan, was ich tun möchte, also werde ich einen Artikel schreiben.

Java-Code

Java wird in letzter Zeit benötigt, daher vorerst nur der Java-Code. Ich kann es in Zukunft in Scala oder Python schreiben.

SaveBinary.java


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;

import java.io.*;
import java.util.*;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
        //Lesen Sie die Bilddatei und das Byte[]RDD erstellen
        JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
        saveAsBinaryFile(bytes, "out");
    }

    //Akzeptiere RDD ist Byte[]Nur
    private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {

        //Die anonyme Klasse implementiert den Inhalt von mapPartitionsWithIndex
        Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
            @Override
            public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
                //Der Name der Ausgabedatei entspricht dem HDFS-Ausgabestil, wird jedoch tatsächlich an NFS ausgegeben
                FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
                while (iterator.hasNext()) {
                    fos.write(iterator.next());
                }
                fos.close();
                return iterator;
            }
        };

        //Führen Sie Operationen für jede Partition mit mapPartitionsWithIndex aus
        bytes.mapPartitionsWithIndex(writeBinary, false).count();
    }
}

Kommentar

mapPartitionsWithIndex wird für jede Partition in der RDD ausgeführt. Wie der Name schon sagt, ist jede Partition nummeriert, sodass sie unter einem anderen Dateinamen gespeichert wird.

Der an mapPartitionsWithIndex übergebene Verarbeitungsinhalt wird als Function2 -Klasse implementiert. Da der Inhalt von RDD für jede Partition in den Iterator eingeht, implementieren Sie damit die gewünschte Verarbeitung. Ich denke, es wäre besser, es als anonyme Klasse zu implementieren, wie oben beschrieben.

mapPartitionsWithIndex selbst ist eine Methode, die RDD zurückgibt. Dieses Mal ist der Rückgabewert von "saveAsBinaryFile" auf "void" gesetzt, und die Ausgabe von "mapPartitionsWithIndex" interessiert mich überhaupt nicht. Die Anzahl nach den mapPartitionsWithIndex ist jedoch wichtig. mapPartitionsWithIndex wird zurückgestellt und nicht ohne nachfolgende Aktion ausgeführt. Wenn Sie im obigen Code "count" vergessen, wird nichts ausgegeben. (Deshalb war ich wirklich begeistert ...)

Ergänzung

Wie man eine Binärdatei mit diesem mapPartitionsWithIndex exportiert, war dieser Hinweis in saveAsTextFile. Schauen Sie sich die Implementierung von saveAsTextFile an (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala). Darin wird Map Partitions verwendet. Ich denke, dass mapPartitions und mapPartitionsWithIndex fast der gleiche Prozess sind, also Ich denke, dass das auf diese Weise implementierte saveAsBinaryFile mit fast der gleichen Leistung funktioniert wie das saveAsTextFile.

Referenz

StackOverflow: Apache Spark mapPartitionsWithIndex

Recommended Posts

SaveAsBinaryFile in Spark
SaveAsBinaryFile mit Spark (Teil 2)
Reihenfolge der Routenauswertung in Spark
So erhalten Sie Parameter in Spark