Avec l'évolution de Java, la programmation parallèle est devenue plus facile à écrire, mais il est encore difficile d'écrire une implémentation correcte et rapide du traitement parallèle. Cela ne fait pas exception au traitement des flux dans des flux parallèles. Regardons l'exemple de Item45.
package tryAny.effectiveJava;
import static java.math.BigInteger.*;
import java.math.BigInteger;
import java.util.stream.Stream;
public class MersennePrimes {
public static void main(String[] args) {
primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE)).filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
// .forEach(System.out::println);
.forEach(mp -> System.out.println(mp.bitLength() + ":" + mp));
}
static Stream<BigInteger> primes() {
return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}
}
Même si parallel () est inclus dans le traitement de flux de ce programme, le traitement ne sera pas terminé et le processeur continuera à rester élevé, au lieu d'accélérer le traitement.
Qu'est-ce qu'il se passe ici? C'est simplement parce que la bibliothèque de flux ne savait pas comment paralléliser ce pipeline et que la solution heuristique a échoué. ** S'il provient à l'origine de Stream.iterate ou si des opérations intermédiaires limites sont effectuées, la parallélisation du pipeline est difficile pour améliorer les performances. ** Le programme ci-dessus comprend ces deux éléments. La leçon ici est que ** n'utilisez pas de flux parallèles sans discernement **.
En général, ** la parallélisation peut améliorer les performances des flux avec ArrayList, HashMap, HashSet, ConcurrentHashMap, des tableaux, des plages int et des plages longues. ** ** Ce qu'ils ont en commun, c'est qu'ils sont faciles à diviser en sous-plages. Une autre chose importante que ces structures de données ont en commun est la ** localité de référence lorsqu'elle est traitée séquentiellement. % 85% A7% E3% 81% AE% E5% B1% 80% E6% 89% 80% E6% 80% A7) **.
Les opérations de terminaison affectent également les performances du traitement parallèle. Si une grande quantité de traitement est effectuée par le traitement de terminaison et que le traitement de terminaison effectue en interne un traitement séquentiel par rapport à l'ensemble du pipeline, la parallélisation du pipeline n'est pas très efficace. Le processus de terminaison le plus efficace est le processus de réduction tel que min, max, count et sum. En outre, [évaluation de court-circuit] comme anyMatch, allMatch, noneMatch (https://ja.wikipedia.org/wiki/%E7%9F%AD%E7%B5%A1%E8%A9%95%E4%BE%A1) Est facile d'obtenir l'effet de la parallélisation. Les opérations de réduction de variables effectuées par la méthode de collecte du flux sont moins susceptibles de bénéficier de la parallélisation. En effet, la surcharge de traitement de la connexion des collections est coûteuse.
Safety failure ** La parallélisation de flux entraîne non seulement des problèmes de performances, y compris des échecs de vivacité, mais peut également entraîner des résultats incorrects et un comportement inattendu. (Échec de sécurité) ** Un échec de sécurité se produit lorsque vous utilisez une fonction qui n'est pas conforme aux spécifications strictes du flux. Par exemple, la fonction qui s'accumule et la fonction qui se combine au flux est [combinée](https://docs.oracle.com/javase/jp/8/docs/api/java/util/stream/ Dans package-summary.html # Associativity), dans Non-interférence , Stateless doit être une fonction. Si vous ne suivez pas cela, les pipelines droits ne poseront aucun problème, mais les pipelines parallélisés peuvent avoir des conséquences désastreuses.
Même si le traitement parallèle peut être effectué dans de très bonnes conditions, il n'a de sens que s'il montre des performances compensant le coût de la parallélisation. Une estimation approximative devrait satisfaire (nombre d'éléments dans le flux) * (nombre de lignes de code exécutées par élément)> 100000 (comme 10000 en regardant la source du lien .. (http: //gee.cs.). oswego.edu / dl / html / StreamParallelGuidance.html)).
Il faut reconnaître que la parallélisation de flux est une optimisation des performances. Vous devez vous assurer que toute optimisation vaut la peine d'être testée avant et après le changement. Idéalement, les tests doivent être effectués dans un environnement de production.
** Si la parallélisation est effectuée dans les bonnes circonstances, une amélioration des performances peut être attendue proportionnellement au nombre de processeurs. ** ** Il est facile d'améliorer ces performances dans les domaines de l'apprentissage automatique et du traitement de données.
[Fonction de comptage principale] qui peut être parallélisée efficacement (https://ja.wikipedia.org/wiki/%E7%B4%A0%E6%95%B0%E8%A8%88%E6%95%B0 Prenons un exemple de% E9% 96% A2% E6% 95% B0).
package tryAny.effectiveJava;
import java.math.BigInteger;
import java.util.stream.LongStream;
public class ParallelTest1 {
// Prime-counting stream pipeline - benefits from parallelization
static long pi(long n) {
return LongStream.rangeClosed(2, n).mapToObj(BigInteger::valueOf).filter(i -> i.isProbablePrime(50)).count();
}
public static void main(String[] args) {
StopWatch sw = new StopWatch();
sw.start();
System.out.println(pi(10000000));
sw.stop();
System.out.println(sw.getTime());
}
}
Il a fallu environ 42 secondes pour traiter le code ci-dessus. Parallélisez cela.
package tryAny.effectiveJava;
import java.math.BigInteger;
import java.util.stream.LongStream;
import org.apache.commons.lang3.time.StopWatch;
public class ParallelTest1 {
// Prime-counting stream pipeline - benefits from parallelization
static long pi(long n) {
return LongStream.rangeClosed(2, n).parallel().mapToObj(BigInteger::valueOf).filter(i -> i.isProbablePrime(50))
.count();
}
public static void main(String[] args) {
StopWatch sw = new StopWatch();
sw.start();
System.out.println(pi(10000000));
sw.stop();
System.out.println(sw.getTime());
}
}
Cela s'est terminé en environ 23 secondes. (Fonctionne sur une machine à 2 cœurs)
Si vous souhaitez générer des valeurs aléatoires en parallèle, vous devez utiliser SplittableRandom plutôt que ThreadLocalRandom.