** 2018/06/17 postscript ** I wrote Article on the right way according to Spark's API. For the purpose of this title, basically refer to it. This article can be used as an example of how to use mapPartitionsWithIndex.
To export data for each partition with Spark
I have a saveAsTextFile
, but I don't have asaveAsBinaryFile
.
There is a saveAsObjectFile
, but it ends up in the SequenceFile format for Hadoop.
There seems to be no API to write binary information as it is, so I was looking for a way to do it.
I've done what I want to do, so I'll write an article.
Java is the most likely thing you'll need in the near future, so for now, just the Java code. I may write it in Scala or Python in the future.
SaveBinary.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import java.io.*;
import java.util.*;
public class SaveBinary {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
//Read the image file and byte[]Create RDD
JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
saveAsBinaryFile(bytes, "out");
}
//Accept RDD is byte[]Only
private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {
//Anonymous class implements the contents of mapPartitionsWithIndex
Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
@Override
public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
//The output file name is HDFS output style, but it is actually output to NFS
FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
while (iterator.hasNext()) {
fos.write(iterator.next());
}
fos.close();
return iterator;
}
};
//Perform operations for each partition with mapPartitionsWithIndex
bytes.mapPartitionsWithIndex(writeBinary, false).count();
}
}
mapPartitionsWithIndex
operates for each partition in the RDD.
Also, as the name suggests, each partition is numbered, so it is saved as a different file name.
The processing content passed to mapPartitionsWithIndex
is implemented as Function2
class.
Since the contents of RDD enter Iterator for each partition, use it to implement the desired processing.
I think it would be better to implement it as an anonymous class as described above.
mapPartitionsWithIndex
itself is a method that returns an RDD,
This time, the return value of saveAsBinaryFile
is set to void, and I don't care about the output of mapPartitionsWithIndex
at all.
However, the count
after the mapPartitionsWithIndex
is important.
mapPartitionsWithIndex
is deferred and will not be executed without further action.
In fact, in the above code, if you forget count
, nothing will be output. (That's why I was really into it ...)
How to export binaries using this mapPartitionsWithIndex
, the hint was in saveAsTextFile
.
Take a look at the implementation of saveAsTextFile
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala) MapPartitions
is used inside.
I think that mapPartitions
and mapPartitionsWithIndex
are almost the same process, so
I think that saveAsBinaryFile
implemented by this method works with almost the same performance as saveAsTextFile
.