[JAVA] SaveAsBinaryFile dans Spark

Contenu de cet article

** PostScript 17/06/2018 ** J'ai écrit Article sur la bonne voie selon l'API de Spark. Pour les besoins de ce titre, faites-y référence. Cet article peut être utilisé comme exemple d'utilisation de mapPartitionsWithIndex.

Pour exporter des données pour chaque partition dans Spark J'ai un saveAsTextFile, mais je n'ai pas desaveAsBinaryFile. Il existe un saveAsObjectFile, mais il se termine au format SequenceFile pour Hadoop. Il ne semble pas y avoir d'API pour écrire les informations binaires telles quelles, alors je cherchais un moyen de le faire. J'ai pu faire ce que je voulais faire, alors je vais écrire un article.

Code Java

Java est celui dont on aura le plus besoin récemment, donc pour l'instant, juste le code Java. Je pourrais l'écrire en Scala ou Python à l'avenir.

SaveBinary.java


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;

import java.io.*;
import java.util.*;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
        //Lire le fichier image et l'octet[]Créer RDD
        JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
        saveAsBinaryFile(bytes, "out");
    }

    //Accepter RDD est un octet[]Seulement
    private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {

        //La classe anonyme implémente le contenu de mapPartitionsWithIndex
        Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
            @Override
            public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
                //Le nom du fichier de sortie est de style de sortie HDFS, mais il est en fait sorti vers NFS
                FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
                while (iterator.hasNext()) {
                    fos.write(iterator.next());
                }
                fos.close();
                return iterator;
            }
        };

        //Effectuer des opérations pour chaque partition avec mapPartitionsWithIndex
        bytes.mapPartitionsWithIndex(writeBinary, false).count();
    }
}

Commentaire

mapPartitionsWithIndex fonctionne pour chaque partition du RDD. De plus, comme son nom l'indique, chaque partition est numérotée, elle est donc enregistrée sous un nom de fichier différent.

Le contenu de traitement passé à mapPartitionsWithIndex est implémenté en tant que classe Function2. Puisque le contenu de RDD entre dans l'itérateur pour chaque partition, utilisez-le pour implémenter le traitement souhaité. Je pense qu'il serait préférable de l'implémenter en tant que classe anonyme comme décrit ci-dessus.

mapPartitionsWithIndex lui-même est une méthode qui renvoie RDD, Cette fois, la valeur de retour de saveAsBinaryFile est définie sur void, et je ne me soucie pas du tout de la sortie de mapPartitionsWithIndex. Cependant, le "count" après le "mapPartitionsWithIndex" est important. mapPartitionsWithIndex est différé et ne sera pas exécuté sans autre action. En fait, dans le code ci-dessus, si vous oubliez count, rien ne sera affiché. (C'est pourquoi j'étais vraiment dedans ...)

Supplément

Comment exporter un binaire en utilisant ce mapPartitionsWithIndex, cet indice était dans saveAsTextFile. Jetez un œil à l'implémentation de saveAsTextFile (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala) «Map Partitions» y est utilisé. Je pense que mapPartitions et mapPartitionsWithIndex sont presque le même processus, donc Je pense que le saveAsBinaryFile implémenté de cette manière fonctionne avec presque les mêmes performances que le saveAsTextFile.

référence

StackOverflow: Apache Spark mapPartitionsWithIndex

Recommended Posts

SaveAsBinaryFile dans Spark
SaveAsBinaryFile avec Spark (partie 2)
Ordre d'évaluation de l'itinéraire dans Spark
Comment obtenir des paramètres dans Spark