Only the top level Stream can be parallelized with Java Stream.

In conclusion, Stream can only parallelize the top level Stream. In flatMap etc., parallel processing does not work for nested Stream processing. (I also read the Java code, but confirmed that the FlatMap Stream was changed to Sequential in the internal code.)

Basically, parallelizing the higher-level processing as much as possible is generally efficient, so this implementation is convincing, but it is not always universal.

For example, when writing code that traces and processes a tree structure, parallelization may not work as expected due to the bias of the tree structure. It may be better to handle the tree structure in the first place.

The code for confirmation is below. Consider having a task in a double array and processing it with a Stream.

package parallel;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Test1 {
    
    static class TestTask implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        
        List<List<TestTask>> table = new ArrayList<>();

        int sum = 0;
        int max = 0;
        Random rgen = new Random(0);
        for (int j = 0; j < 3; ++j) {
            List<TestTask> list = IntStream.range(0, 5 + rgen.nextInt(10))
                    .mapToObj(i -> new TestTask())
                    .collect(Collectors.toList());
            table.add(list);

            sum += list.size();
            max = Math.max(max, list.size());
        }

        System.out.println("table: "+ table.stream()
                .map(l -> Integer.toString(l.size()))
                .collect(Collectors.joining(", ")));
        System.out.printf("sum: %d, max: %d\n", sum, max);
        System.out.println("parallelism: "+ ForkJoinPool.commonPool().getParallelism());
        System.out.println();

        {
            // 1.base-parallel
            long t0 = System.currentTimeMillis();
            table.stream()
                    .parallel()
                    .flatMap(l -> l.stream())
                    .forEach(task -> task.run());
            System.out.printf("1.base-parallel:     %,5d msec\n", (System.currentTimeMillis() - t0));
        }

        {
            // 2.flat-map-parallel
            long t0 = System.currentTimeMillis();
            table.stream()
                    .flatMap(l -> l.parallelStream())
                    .forEach(task -> task.run());
            System.out.printf("2.flat-map-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
        }

        {
            // 3.both-parallel
            long t0 = System.currentTimeMillis();
            table.parallelStream()
            .flatMap(l -> l.parallelStream()
                    .peek(task -> task.run())
                    .collect(Collectors.toList())
                    .stream())
                    .forEach(task -> {});
            System.out.printf("3.both-parallel:     %,5d msec\n", (System.currentTimeMillis() - t0));
        }
    }
}

The result is below.

table: 5, 13, 14
sum: 32, max: 14
parallelism: 3

1.base-parallel:     1,424 msec
2.flat-map-parallel: 3,281 msec
3.both-parallel:       926 msec

Let's look at each result in turn.

  1. Base-parallel parallelizes only the upper array. Therefore, it is affected by a slight bias in the double array of tasks. It can be confirmed that it takes 14 * 100msec under the influence of 14 of table [3] .length.

  2. In flat-map-parallel, parallel does not work as mentioned at the beginning. Therefore, since all tasks are processed sequentially, it takes a total of 32 * 100 msec of tasks.

  3. In both-parallel, the termination process collect is uselessly inserted in the flatMap to make it parallel. As a result, it can be completed in the shortest time.

By the way, in the confirmed reflection, I could not find any description about the behavior around here even in the description such as Javadoc. If you know that there is a description somewhere, please let me know.

Recommended Posts

Only the top level Stream can be parallelized with Java Stream.
Stream processing of Java 8 can be omitted so far!
Replace only part of the URL host with java
Four-in-a-row with gravity that can be played on the console
Java Stream cannot be reused.
Be sure to compare the result of Java compareTo with 0
Java arguments to run with gradle run can be specified with --args (since Gradle 4.9)
Introduction to Java that can be understood even with Krillin (Part 1)
[Java 8] Duplicate deletion (& duplicate check) with Stream
[java8] To understand the Stream API
[Java] Element existence check with Stream
Follow the link with Selenium (Java)
Java8 list conversion with Stream map
Set the access load that can be changed graphically with JMeter (Part 2)
About the problem that the server can not be started with rails s
Set the access load that can be changed graphically with JMeter (Part 1)
[Rails] "pry-rails" that can be used when saving with the create method
Try using the Stream API in Java
Try using the Wii remote with Java
[Java] Get the date with the LocalDateTime class
Increment with the third argument of iterate method of Stream class added from Java9
[Java] Implemented a strict line feed stream reader that readsLine only with CrLf.
Be careful with requests and responses when using the Serverless Framework in Java