Seul le flux de niveau supérieur peut être parallélisé avec Java Stream.

En conclusion, Stream ne peut paralléliser que le Stream supérieur. Dans flatMap, etc., le traitement parallèle ne fonctionne pas pour le traitement de flux imbriqué. (J'ai également lu le code Java, mais j'ai confirmé que FlatMap Stream avait été changé en Sequential dans le code interne.)

Fondamentalement, paralléliser autant que possible le traitement de niveau supérieur est généralement efficace, cette implémentation est donc convaincante, mais elle n'est pas toujours universelle.

Par exemple, lors de l'écriture de code qui trace et traite une structure arborescente, la parallélisation peut ne pas être aussi efficace que prévu en raison du biais de la structure arborescente. Il peut être préférable de gérer la structure arborescente en premier lieu.

Le code de confirmation est ci-dessous. Pensez à avoir une tâche dans un double tableau et à la traiter dans un Stream.

package parallel;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Test1 {
    
    static class TestTask implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        
        List<List<TestTask>> table = new ArrayList<>();

        int sum = 0;
        int max = 0;
        Random rgen = new Random(0);
        for (int j = 0; j < 3; ++j) {
            List<TestTask> list = IntStream.range(0, 5 + rgen.nextInt(10))
                    .mapToObj(i -> new TestTask())
                    .collect(Collectors.toList());
            table.add(list);

            sum += list.size();
            max = Math.max(max, list.size());
        }

        System.out.println("table: "+ table.stream()
                .map(l -> Integer.toString(l.size()))
                .collect(Collectors.joining(", ")));
        System.out.printf("sum: %d, max: %d\n", sum, max);
        System.out.println("parallelism: "+ ForkJoinPool.commonPool().getParallelism());
        System.out.println();

        {
            // 1.base-parallel
            long t0 = System.currentTimeMillis();
            table.stream()
                    .parallel()
                    .flatMap(l -> l.stream())
                    .forEach(task -> task.run());
            System.out.printf("1.base-parallel:     %,5d msec\n", (System.currentTimeMillis() - t0));
        }

        {
            // 2.flat-map-parallel
            long t0 = System.currentTimeMillis();
            table.stream()
                    .flatMap(l -> l.parallelStream())
                    .forEach(task -> task.run());
            System.out.printf("2.flat-map-parallel: %,5d msec\n", (System.currentTimeMillis() - t0));
        }

        {
            // 3.both-parallel
            long t0 = System.currentTimeMillis();
            table.parallelStream()
            .flatMap(l -> l.parallelStream()
                    .peek(task -> task.run())
                    .collect(Collectors.toList())
                    .stream())
                    .forEach(task -> {});
            System.out.printf("3.both-parallel:     %,5d msec\n", (System.currentTimeMillis() - t0));
        }
    }
}

Le résultat est ci-dessous.

table: 5, 13, 14
sum: 32, max: 14
parallelism: 3

1.base-parallel:     1,424 msec
2.flat-map-parallel: 3,281 msec
3.both-parallel:       926 msec

Regardons chaque résultat tour à tour.

  1. Base-parallel met en parallèle uniquement la matrice supérieure. Par conséquent, il est affecté par un léger biais dans la double disposition des tâches. On peut confirmer que cela prend 14 * 100msec sous l'influence de 14 du tableau [3] .length.

  2. En parallèle, parallèle ne fonctionne pas comme mentionné au début. Par conséquent, puisque toutes les tâches sont traitées séquentiellement, cela prend un total de 32 * 100 msec de tâches.

  3. Dans les deux parallèles, la parallélisation est effectuée en insérant inutilement le traitement de terminaison collect dans le flatMap. En conséquence, il peut être terminé dans les plus brefs délais.

À propos, dans la réflexion confirmée, je n'ai trouvé aucune description du comportement ici même dans la description telle que Javadoc. Si vous savez qu'il y a une description quelque part, faites-le moi savoir.

Recommended Posts

Seul le flux de niveau supérieur peut être parallélisé avec Java Stream.
Le traitement Java 8 Stream peut être omis jusqu'à présent!
Remplacez seulement une partie de l'hôte URL par java
Quatre rangées avec gravité pouvant être jouées sur la console
Java Stream ne peut pas être réutilisé.
Assurez-vous de comparer le résultat Java compareTo avec 0
Les arguments Java à exécuter avec gradle run peuvent être spécifiés avec --args (depuis Gradle 4.9)
[Java 8] Suppression en double (et vérification en double) avec Stream
[java8] Pour comprendre l'API Stream
[Java] Vérification de l'existence des éléments avec Stream
Suivez le lien avec Selenium (Java)
Conversion de liste Java8 avec Stream map
Définissez la charge d'accès qui peut être modifiée graphiquement avec JMeter (partie 2)
Définissez la charge d'accès qui peut être modifiée graphiquement avec JMeter (Partie 1)
[Rails] "Pry-rails" qui peuvent être utilisés lors de l'enregistrement avec la méthode create
Essayez d'utiliser l'API Stream en Java
Essayez d'utiliser la télécommande Wii en Java
[Java] Obtenez la date avec la classe LocalDateTime
Incrémenté du troisième argument de la méthode iterate de la classe Stream ajoutée depuis Java9
[Java] Implémentation d'un lecteur de flux de ligne stricte qui lit uniquement avec CrLf.
Soyez prudent avec les demandes et les réponses lors de l'utilisation de Serverless Framework avec Java