Nur der Stream der obersten Ebene kann mit Java Stream parallelisiert werden.

Zusammenfassend kann Stream nur den obersten Stream parallelisieren. In flatMap usw. funktioniert die Parallelverarbeitung nicht für die Verarbeitung verschachtelter Streams. (Ich habe auch den Java-Code gelesen, aber bestätigt, dass der FlatMap-Stream im internen Code in Sequential geändert wurde.)

Grundsätzlich ist es im Allgemeinen effizient, die übergeordnete Verarbeitung so weit wie möglich zu parallelisieren. Daher ist diese Implementierung überzeugend, aber nicht immer universell.

Wenn Sie beispielsweise Code schreiben, der eine Baumstruktur verfolgt und verarbeitet, ist die Parallelisierung aufgrund der Verzerrung der Baumstruktur möglicherweise nicht so effektiv wie erwartet. Es kann besser sein, zuerst mit der Baumstruktur umzugehen.

Der Code zur Bestätigung ist unten. Erwägen Sie, eine Aufgabe in einem doppelten Array zu haben und sie mit einem Stream zu verarbeiten.

package parallel;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Test1 {
    
    static class TestTask implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        
        List<List<TestTask>> table = new ArrayList<>();

        int sum = 0;
        int max = 0;
        Random rgen = new Random(0);
        for (int j = 0; j < 3; ++j) {
            List<TestTask> list = IntStream.range(0, 5 + rgen.nextInt(10))
                    .mapToObj(i -> new TestTask())
                    .collect(Collectors.toList());
            table.add(list);

            sum += list.size();
            max = Math.max(max, list.size());
        }

        System.out.println("table: "+ table.stream()
                .map(l -> Integer.toString(l.size()))
                .collect(Collectors.joining(", ")));
        System.out.printf("sum: %d, max: %d\n", sum, max);
        System.out.println("parallelism: "+ ForkJoinPool.commonPool().getParallelism());
        System.out.println();

        {
            // 1.base-parallel
            long t0 = System.currentTimeMillis();
            table.stream()
                    .parallel()
                    .flatMap(l -> l.stream())
                    .forEach(task -> task.run());
            System.out.printf("1.base-parallel:     %,5d msec\n", (System.currentTimeMillis() - t0));
        }

        {
            // 2.flat-map-parallel
            long t0 = System.currentTimeMillis();
            table.stream()
                    .flatMap(l -> l.parallelStream())
                    .forEach(task -> task.run());
            System.out.printf("2.flat-map-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
        }

        {
            // 3.both-parallel
            long t0 = System.currentTimeMillis();
            table.parallelStream()
            .flatMap(l -> l.parallelStream()
                    .peek(task -> task.run())
                    .collect(Collectors.toList())
                    .stream())
                    .forEach(task -> {});
            System.out.printf("3.both-parallel:     %,5d msec\n", (System.currentTimeMillis() - t0));
        }
    }
}

Das Ergebnis ist unten.

table: 5, 13, 14
sum: 32, max: 14
parallelism: 3

1.base-parallel:     1,424 msec
2.flat-map-parallel: 3,281 msec
3.both-parallel:       926 msec

Schauen wir uns jedes Ergebnis der Reihe nach an.

  1. Basisparallel parallelisiert nur das obere Array. Daher ist es von einer leichten Verzerrung bei der doppelten Anordnung der Aufgaben betroffen. Es kann bestätigt werden, dass es unter dem Einfluss von 14 der Tabelle [3] .length 14 * 100 ms dauert.

  2. In Flat-Map-Parallel funktioniert Parallel nicht wie eingangs erwähnt. Da alle Aufgaben nacheinander verarbeitet werden, sind insgesamt 32 * 100 ms Aufgaben erforderlich.

  3. In beiden Parallelen wird die Parallelisierung durchgeführt, indem die Abschlussverarbeitungssammlung unbrauchbar in die flatMap eingefügt wird. Dadurch kann es in kürzester Zeit abgeschlossen werden.

Übrigens konnte ich in der bestätigten Reflexion selbst in der Beschreibung wie Javadoc keine Beschreibung des Verhaltens hier finden. Wenn Sie wissen, dass es irgendwo eine Beschreibung gibt, lassen Sie es mich bitte wissen.

Recommended Posts

Nur der Stream der obersten Ebene kann mit Java Stream parallelisiert werden.
Die Java 8 Stream-Verarbeitung kann bisher weggelassen werden!
Ersetzen Sie nur einen Teil des URL-Hosts durch Java
Vier Reihen mit Schwerkraft, die auf der Konsole gespielt werden können
Java Stream kann nicht wiederverwendet werden.
Stellen Sie sicher, dass Sie das Java compareTo-Ergebnis mit 0 vergleichen
Java-Argumente, die mit gradle run ausgeführt werden sollen, können mit --args angegeben werden (seit Gradle 4.9).
[Java 8] Doppelte Löschung (& doppelte Überprüfung) mit Stream
[java8] Um die Stream-API zu verstehen
[Java] Elementexistenzprüfung mit Stream
Folgen Sie dem Link mit Selen (Java)
Java8-Listenkonvertierung mit Stream Map
Stellen Sie die Zugriffslast ein, die mit JMeter grafisch geändert werden kann (Teil 2).
Stellen Sie die Zugriffslast ein, die mit JMeter grafisch geändert werden kann (Teil 1).
[Rails] "Pry-Rails", die beim Speichern mit der create-Methode verwendet werden können
Versuchen Sie es mit der Stream-API in Java
Versuchen Sie es mit der Wii-Fernbedienung in Java
[Java] Ermitteln Sie das Datum mit der LocalDateTime-Klasse
Inkrementiert durch das dritte Argument der iterierten Methode der aus Java9 hinzugefügten Stream-Klasse
[Java] Implementierte einen strengen Zeilenvorschub-Stream-Reader, der Line nur mit CrLf liest.
Seien Sie vorsichtig mit Anfragen und Antworten, wenn Sie das Serverless Framework mit Java verwenden