** PostScript 17/06/2018 ** J'ai écrit Article sur la bonne voie selon l'API de Spark. Pour les besoins de ce titre, faites-y référence. Cet article peut être utilisé comme exemple d'utilisation de mapPartitionsWithIndex.
Pour exporter des données pour chaque partition dans Spark
J'ai un saveAsTextFile
, mais je n'ai pas desaveAsBinaryFile
.
Il existe un saveAsObjectFile
, mais il se termine au format SequenceFile pour Hadoop.
Il ne semble pas y avoir d'API pour écrire les informations binaires telles quelles, alors je cherchais un moyen de le faire.
J'ai pu faire ce que je voulais faire, alors je vais écrire un article.
Java est celui dont on aura le plus besoin récemment, donc pour l'instant, juste le code Java. Je pourrais l'écrire en Scala ou Python à l'avenir.
SaveBinary.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import java.io.*;
import java.util.*;
public class SaveBinary {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
//Lire le fichier image et l'octet[]Créer RDD
JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
saveAsBinaryFile(bytes, "out");
}
//Accepter RDD est un octet[]Seulement
private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {
//La classe anonyme implémente le contenu de mapPartitionsWithIndex
Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
@Override
public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
//Le nom du fichier de sortie est de style de sortie HDFS, mais il est en fait sorti vers NFS
FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
while (iterator.hasNext()) {
fos.write(iterator.next());
}
fos.close();
return iterator;
}
};
//Effectuer des opérations pour chaque partition avec mapPartitionsWithIndex
bytes.mapPartitionsWithIndex(writeBinary, false).count();
}
}
mapPartitionsWithIndex
fonctionne pour chaque partition du RDD.
De plus, comme son nom l'indique, chaque partition est numérotée, elle est donc enregistrée sous un nom de fichier différent.
Le contenu de traitement passé à mapPartitionsWithIndex
est implémenté en tant que classe Function2
.
Puisque le contenu de RDD entre dans l'itérateur pour chaque partition, utilisez-le pour implémenter le traitement souhaité.
Je pense qu'il serait préférable de l'implémenter en tant que classe anonyme comme décrit ci-dessus.
mapPartitionsWithIndex
lui-même est une méthode qui renvoie RDD,
Cette fois, la valeur de retour de saveAsBinaryFile
est définie sur void, et je ne me soucie pas du tout de la sortie de mapPartitionsWithIndex
.
Cependant, le "count" après le "mapPartitionsWithIndex" est important.
mapPartitionsWithIndex
est différé et ne sera pas exécuté sans autre action.
En fait, dans le code ci-dessus, si vous oubliez count
, rien ne sera affiché. (C'est pourquoi j'étais vraiment dedans ...)
Comment exporter un binaire en utilisant ce mapPartitionsWithIndex
, cet indice était dans saveAsTextFile
.
Jetez un œil à l'implémentation de saveAsTextFile
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala) «Map Partitions» y est utilisé.
Je pense que mapPartitions
et mapPartitionsWithIndex
sont presque le même processus, donc
Je pense que le saveAsBinaryFile
implémenté de cette manière fonctionne avec presque les mêmes performances que le saveAsTextFile
.