En conclusion, Stream ne peut paralléliser que le Stream supérieur. Dans flatMap, etc., le traitement parallèle ne fonctionne pas pour le traitement de flux imbriqué. (J'ai également lu le code Java, mais j'ai confirmé que FlatMap Stream avait été changé en Sequential dans le code interne.)
Fondamentalement, paralléliser autant que possible le traitement de niveau supérieur est généralement efficace, cette implémentation est donc convaincante, mais elle n'est pas toujours universelle.
Par exemple, lors de l'écriture de code qui trace et traite une structure arborescente, la parallélisation peut ne pas être aussi efficace que prévu en raison du biais de la structure arborescente. Il peut être préférable de gérer la structure arborescente en premier lieu.
Le code de confirmation est ci-dessous. Pensez à avoir une tâche dans un double tableau et à la traiter dans un Stream.
package parallel;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Test1 {
static class TestTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
List<List<TestTask>> table = new ArrayList<>();
int sum = 0;
int max = 0;
Random rgen = new Random(0);
for (int j = 0; j < 3; ++j) {
List<TestTask> list = IntStream.range(0, 5 + rgen.nextInt(10))
.mapToObj(i -> new TestTask())
.collect(Collectors.toList());
table.add(list);
sum += list.size();
max = Math.max(max, list.size());
}
System.out.println("table: "+ table.stream()
.map(l -> Integer.toString(l.size()))
.collect(Collectors.joining(", ")));
System.out.printf("sum: %d, max: %d\n", sum, max);
System.out.println("parallelism: "+ ForkJoinPool.commonPool().getParallelism());
System.out.println();
{
// 1.base-parallel
long t0 = System.currentTimeMillis();
table.stream()
.parallel()
.flatMap(l -> l.stream())
.forEach(task -> task.run());
System.out.printf("1.base-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
{
// 2.flat-map-parallel
long t0 = System.currentTimeMillis();
table.stream()
.flatMap(l -> l.parallelStream())
.forEach(task -> task.run());
System.out.printf("2.flat-map-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
{
// 3.both-parallel
long t0 = System.currentTimeMillis();
table.parallelStream()
.flatMap(l -> l.parallelStream()
.peek(task -> task.run())
.collect(Collectors.toList())
.stream())
.forEach(task -> {});
System.out.printf("3.both-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
}
}
Le résultat est ci-dessous.
table: 5, 13, 14
sum: 32, max: 14
parallelism: 3
1.base-parallel: 1,424 msec
2.flat-map-parallel: 3,281 msec
3.both-parallel: 926 msec
Regardons chaque résultat tour à tour.
Base-parallel met en parallèle uniquement la matrice supérieure. Par conséquent, il est affecté par un léger biais dans la double disposition des tâches. On peut confirmer que cela prend 14 * 100msec sous l'influence de 14 du tableau [3] .length.
En parallèle, parallèle ne fonctionne pas comme mentionné au début. Par conséquent, puisque toutes les tâches sont traitées séquentiellement, cela prend un total de 32 * 100 msec de tâches.
Dans les deux parallèles, la parallélisation est effectuée en insérant inutilement le traitement de terminaison collect dans le flatMap. En conséquence, il peut être terminé dans les plus brefs délais.
À propos, dans la réflexion confirmée, je n'ai trouvé aucune description du comportement ici même dans la description telle que Javadoc. Si vous savez qu'il y a une description quelque part, faites-le moi savoir.