** 17.06.2018 Nachtrag ** Ich habe Artikel auf dem richtigen Weg gemäß der Spark-API geschrieben. Beziehen Sie sich für den Zweck dieses Titels grundsätzlich darauf. Dieser Artikel kann als Beispiel für die Verwendung von mapPartitionsWithIndex verwendet werden.
So exportieren Sie Daten für jede Partition in Spark
Ich habe ein saveAsTextFile
, aber ich habe keinsaveAsBinaryFile
.
Es gibt ein saveAsObjectFile
, das jedoch im SequenceFile-Format für Hadoop endet.
Es scheint keine API zu geben, um binäre Informationen so wie sie sind zu schreiben, also suchte ich nach einer Möglichkeit, dies zu tun.
Ich habe getan, was ich tun möchte, also werde ich einen Artikel schreiben.
Java wird in letzter Zeit benötigt, daher vorerst nur der Java-Code. Ich kann es in Zukunft in Scala oder Python schreiben.
SaveBinary.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import java.io.*;
import java.util.*;
public class SaveBinary {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
//Lesen Sie die Bilddatei und das Byte[]RDD erstellen
JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
saveAsBinaryFile(bytes, "out");
}
//Akzeptiere RDD ist Byte[]Nur
private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {
//Die anonyme Klasse implementiert den Inhalt von mapPartitionsWithIndex
Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
@Override
public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
//Der Name der Ausgabedatei entspricht dem HDFS-Ausgabestil, wird jedoch tatsächlich an NFS ausgegeben
FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
while (iterator.hasNext()) {
fos.write(iterator.next());
}
fos.close();
return iterator;
}
};
//Führen Sie Operationen für jede Partition mit mapPartitionsWithIndex aus
bytes.mapPartitionsWithIndex(writeBinary, false).count();
}
}
mapPartitionsWithIndex
wird für jede Partition in der RDD ausgeführt.
Wie der Name schon sagt, ist jede Partition nummeriert, sodass sie unter einem anderen Dateinamen gespeichert wird.
Der an mapPartitionsWithIndex
übergebene Verarbeitungsinhalt wird als Function2
-Klasse implementiert.
Da der Inhalt von RDD für jede Partition in den Iterator eingeht, implementieren Sie damit die gewünschte Verarbeitung.
Ich denke, es wäre besser, es als anonyme Klasse zu implementieren, wie oben beschrieben.
mapPartitionsWithIndex
selbst ist eine Methode, die RDD zurückgibt.
Dieses Mal ist der Rückgabewert von "saveAsBinaryFile" auf "void" gesetzt, und die Ausgabe von "mapPartitionsWithIndex" interessiert mich überhaupt nicht.
Die Anzahl nach den mapPartitionsWithIndex ist jedoch wichtig.
mapPartitionsWithIndex
wird zurückgestellt und nicht ohne nachfolgende Aktion ausgeführt.
Wenn Sie im obigen Code "count" vergessen, wird nichts ausgegeben. (Deshalb war ich wirklich begeistert ...)
Wie man eine Binärdatei mit diesem mapPartitionsWithIndex
exportiert, war dieser Hinweis in saveAsTextFile
.
Schauen Sie sich die Implementierung von saveAsTextFile
an (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala). Darin wird Map Partitions
verwendet.
Ich denke, dass mapPartitions
und mapPartitionsWithIndex
fast der gleiche Prozess sind, also
Ich denke, dass das auf diese Weise implementierte saveAsBinaryFile
mit fast der gleichen Leistung funktioniert wie das saveAsTextFile
.