[JAVA] SaveAsBinaryFile in Spark

Content of this article

** 2018/06/17 postscript ** I wrote Article on the right way according to Spark's API. For the purpose of this title, basically refer to it. This article can be used as an example of how to use mapPartitionsWithIndex.

To export data for each partition with Spark I have a saveAsTextFile, but I don't have asaveAsBinaryFile. There is a saveAsObjectFile, but it ends up in the SequenceFile format for Hadoop. There seems to be no API to write binary information as it is, so I was looking for a way to do it. I've done what I want to do, so I'll write an article.

Java code

Java is the most likely thing you'll need in the near future, so for now, just the Java code. I may write it in Scala or Python in the future.

SaveBinary.java


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;

import java.io.*;
import java.util.*;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
        //Read the image file and byte[]Create RDD
        JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
        saveAsBinaryFile(bytes, "out");
    }

    //Accept RDD is byte[]Only
    private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {

        //Anonymous class implements the contents of mapPartitionsWithIndex
        Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
            @Override
            public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
                //The output file name is HDFS output style, but it is actually output to NFS
                FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
                while (iterator.hasNext()) {
                    fos.write(iterator.next());
                }
                fos.close();
                return iterator;
            }
        };

        //Perform operations for each partition with mapPartitionsWithIndex
        bytes.mapPartitionsWithIndex(writeBinary, false).count();
    }
}

Commentary

mapPartitionsWithIndex operates for each partition in the RDD. Also, as the name suggests, each partition is numbered, so it is saved as a different file name.

The processing content passed to mapPartitionsWithIndex is implemented as Function2 class. Since the contents of RDD enter Iterator for each partition, use it to implement the desired processing. I think it would be better to implement it as an anonymous class as described above.

mapPartitionsWithIndex itself is a method that returns an RDD, This time, the return value of saveAsBinaryFile is set to void, and I don't care about the output of mapPartitionsWithIndex at all. However, the count after the mapPartitionsWithIndex is important. mapPartitionsWithIndex is deferred and will not be executed without further action. In fact, in the above code, if you forget count, nothing will be output. (That's why I was really into it ...)

Supplement

How to export binaries using this mapPartitionsWithIndex, the hint was in saveAsTextFile. Take a look at the implementation of saveAsTextFile (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala) MapPartitions is used inside. I think that mapPartitions and mapPartitionsWithIndex are almost the same process, so I think that saveAsBinaryFile implemented by this method works with almost the same performance as saveAsTextFile.

reference

StackOverflow: Apache Spark mapPartitionsWithIndex

Recommended Posts

SaveAsBinaryFile in Spark
SaveAsBinaryFile with Spark (Part 2)
Order of route evaluation in Spark
How to get parameters in Spark