Zusammenfassend kann Stream nur den obersten Stream parallelisieren. In flatMap usw. funktioniert die Parallelverarbeitung nicht für die Verarbeitung verschachtelter Streams. (Ich habe auch den Java-Code gelesen, aber bestätigt, dass der FlatMap-Stream im internen Code in Sequential geändert wurde.)
Grundsätzlich ist es im Allgemeinen effizient, die übergeordnete Verarbeitung so weit wie möglich zu parallelisieren. Daher ist diese Implementierung überzeugend, aber nicht immer universell.
Wenn Sie beispielsweise Code schreiben, der eine Baumstruktur verfolgt und verarbeitet, ist die Parallelisierung aufgrund der Verzerrung der Baumstruktur möglicherweise nicht so effektiv wie erwartet. Es kann besser sein, zuerst mit der Baumstruktur umzugehen.
Der Code zur Bestätigung ist unten. Erwägen Sie, eine Aufgabe in einem doppelten Array zu haben und sie mit einem Stream zu verarbeiten.
package parallel;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Test1 {
static class TestTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
List<List<TestTask>> table = new ArrayList<>();
int sum = 0;
int max = 0;
Random rgen = new Random(0);
for (int j = 0; j < 3; ++j) {
List<TestTask> list = IntStream.range(0, 5 + rgen.nextInt(10))
.mapToObj(i -> new TestTask())
.collect(Collectors.toList());
table.add(list);
sum += list.size();
max = Math.max(max, list.size());
}
System.out.println("table: "+ table.stream()
.map(l -> Integer.toString(l.size()))
.collect(Collectors.joining(", ")));
System.out.printf("sum: %d, max: %d\n", sum, max);
System.out.println("parallelism: "+ ForkJoinPool.commonPool().getParallelism());
System.out.println();
{
// 1.base-parallel
long t0 = System.currentTimeMillis();
table.stream()
.parallel()
.flatMap(l -> l.stream())
.forEach(task -> task.run());
System.out.printf("1.base-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
{
// 2.flat-map-parallel
long t0 = System.currentTimeMillis();
table.stream()
.flatMap(l -> l.parallelStream())
.forEach(task -> task.run());
System.out.printf("2.flat-map-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
{
// 3.both-parallel
long t0 = System.currentTimeMillis();
table.parallelStream()
.flatMap(l -> l.parallelStream()
.peek(task -> task.run())
.collect(Collectors.toList())
.stream())
.forEach(task -> {});
System.out.printf("3.both-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
}
}
Das Ergebnis ist unten.
table: 5, 13, 14
sum: 32, max: 14
parallelism: 3
1.base-parallel: 1,424 msec
2.flat-map-parallel: 3,281 msec
3.both-parallel: 926 msec
Schauen wir uns jedes Ergebnis der Reihe nach an.
Basisparallel parallelisiert nur das obere Array. Daher ist es von einer leichten Verzerrung bei der doppelten Anordnung der Aufgaben betroffen. Es kann bestätigt werden, dass es unter dem Einfluss von 14 der Tabelle [3] .length 14 * 100 ms dauert.
In Flat-Map-Parallel funktioniert Parallel nicht wie eingangs erwähnt. Da alle Aufgaben nacheinander verarbeitet werden, sind insgesamt 32 * 100 ms Aufgaben erforderlich.
In beiden Parallelen wird die Parallelisierung durchgeführt, indem die Abschlussverarbeitungssammlung unbrauchbar in die flatMap eingefügt wird. Dadurch kann es in kürzester Zeit abgeschlossen werden.
Übrigens konnte ich in der bestätigten Reflexion selbst in der Beschreibung wie Javadoc keine Beschreibung des Verhaltens hier finden. Wenn Sie wissen, dass es irgendwo eine Beschreibung gibt, lassen Sie es mich bitte wissen.
Recommended Posts