In conclusion, Stream can only parallelize the top level Stream. In flatMap etc., parallel processing does not work for nested Stream processing. (I also read the Java code, but confirmed that the FlatMap Stream was changed to Sequential in the internal code.)
Basically, parallelizing the higher-level processing as much as possible is generally efficient, so this implementation is convincing, but it is not always universal.
For example, when writing code that traces and processes a tree structure, parallelization may not work as expected due to the bias of the tree structure. It may be better to handle the tree structure in the first place.
The code for confirmation is below. Consider having a task in a double array and processing it with a Stream.
package parallel;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Test1 {
static class TestTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
List<List<TestTask>> table = new ArrayList<>();
int sum = 0;
int max = 0;
Random rgen = new Random(0);
for (int j = 0; j < 3; ++j) {
List<TestTask> list = IntStream.range(0, 5 + rgen.nextInt(10))
.mapToObj(i -> new TestTask())
.collect(Collectors.toList());
table.add(list);
sum += list.size();
max = Math.max(max, list.size());
}
System.out.println("table: "+ table.stream()
.map(l -> Integer.toString(l.size()))
.collect(Collectors.joining(", ")));
System.out.printf("sum: %d, max: %d\n", sum, max);
System.out.println("parallelism: "+ ForkJoinPool.commonPool().getParallelism());
System.out.println();
{
// 1.base-parallel
long t0 = System.currentTimeMillis();
table.stream()
.parallel()
.flatMap(l -> l.stream())
.forEach(task -> task.run());
System.out.printf("1.base-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
{
// 2.flat-map-parallel
long t0 = System.currentTimeMillis();
table.stream()
.flatMap(l -> l.parallelStream())
.forEach(task -> task.run());
System.out.printf("2.flat-map-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
{
// 3.both-parallel
long t0 = System.currentTimeMillis();
table.parallelStream()
.flatMap(l -> l.parallelStream()
.peek(task -> task.run())
.collect(Collectors.toList())
.stream())
.forEach(task -> {});
System.out.printf("3.both-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
}
}
}
The result is below.
table: 5, 13, 14
sum: 32, max: 14
parallelism: 3
1.base-parallel: 1,424 msec
2.flat-map-parallel: 3,281 msec
3.both-parallel: 926 msec
Let's look at each result in turn.
Base-parallel parallelizes only the upper array. Therefore, it is affected by a slight bias in the double array of tasks. It can be confirmed that it takes 14 * 100msec under the influence of 14 of table [3] .length.
In flat-map-parallel, parallel does not work as mentioned at the beginning. Therefore, since all tasks are processed sequentially, it takes a total of 32 * 100 msec of tasks.
In both-parallel, the termination process collect is uselessly inserted in the flatMap to make it parallel. As a result, it can be completed in the shortest time.
By the way, in the confirmed reflection, I could not find any description about the behavior around here even in the description such as Javadoc. If you know that there is a description somewhere, please let me know.
Recommended Posts