Java verfügt über eine Funktion namens ParallelStream, mit der Sie problemlos Parallelverarbeitung schreiben können. Bei ordnungsgemäßer Verwendung kann die Leistung erheblich verbessert werden. Es ist eine Funktion, die in Java8 hinzugefügt wurde, also neu, aber kürzlich habe ich die Güte paralleler Streams erkannt, also werde ich sie schreiben.
Betrachten wir hier die Auswirkung paralleler Streams am Beispiel des Codes, der prüft, ob die Anzahl der Capreca (Definition 2) [^ 1] für numerische Werte von 0 bis 600 Millionen gilt. [^ 1]: Sie müssen nicht wissen, wie viele Capreca es gibt. Auf jeden Fall ist es eine Art Verarbeitung, die die CPU belastet. Wenn Sie es noch wissen möchten, gehen Sie bitte auf Google. Auch die Zahl 600 Millionen hat keine besondere Bedeutung. Es ist nur die Anzahl der Fälle, deren Bearbeitung eine gute Zeit in Anspruch nimmt.
Schauen wir uns zunächst ein Beispiel an, das keine parallelen Streams verwendet. (Sequentieller Stream) Es ist schlampig und lang, schauen Sie sich also zunächst die unten stehende Hauptmethode an.
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class ParallelStreamSample {
//Ob es die Anzahl der Capreca ist (Definition 2)
private static boolean isKaprekarNumber2(long n) {
List<Character> chars = String.valueOf(n)
.chars()
.mapToObj(c -> (char)c)
.collect(Collectors.toList());
// min
chars.sort(Comparator.naturalOrder());
var min = parseLong(join(chars));
// max
chars.sort(Comparator.reverseOrder());
var max = parseLong(join(chars));
return n == (max - min);
}
private static <T> String join(List<T> list) {
var sb = new StringBuilder();
for(T item : list) {
sb.append(item);
}
return sb.toString();
}
private static long parseLong(String s) {
if(s.isEmpty()) {
return 0;
} else {
return Long.parseLong(s);
}
}
public static void main(String[] args) {
System.out.println("--------------------");
System.out.println("Capreca-Nummerndefinition 2");
long start = System.nanoTime();
LongStream.rangeClosed(0, 600_000_000)
//Sequentieller Stream (normalerweise ist dieser Aufruf nicht erforderlich, wird jedoch zum Vergleich mit parallelem Stream beschrieben)
.sequential()
.filter(n -> n % 9 == 0)
.filter(ParallelStreamSample::isKaprekarNumber2)
.forEachOrdered(System.out::println);
long end = System.nanoTime();
System.out.printf("Verarbeitungszeit(ms): %d\n", (end - start) / 1_000_000);
System.out.println("--------------------");
}
}
In meiner Umgebung ist das Ergebnis wie folgt.
--------------------
Capreca-Nummerndefinition 2
0
495
6174
549945
631764
63317664
97508421
554999445
Verarbeitungszeit(ms): 60284
--------------------
Es dauert etwas mehr als eine Minute. Ich würde es gerne etwas schneller machen, aber wie schnell wäre es mit parallelen Streams?
Schauen wir uns als nächstes ein Beispiel für die Verwendung paralleler Streams an.
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class ParallelStreamSample {
//Ob es die Anzahl der Capreca ist (Definition 2)
private static boolean isKaprekarNumber2(long n) {
List<Character> chars = String.valueOf(n)
.chars()
.mapToObj(c -> (char)c)
.collect(Collectors.toList());
// min
chars.sort(Comparator.naturalOrder());
var min = parseLong(join(chars));
// max
chars.sort(Comparator.reverseOrder());
var max = parseLong(join(chars));
return n == (max - min);
}
private static <T> String join(List<T> list) {
var sb = new StringBuilder();
for(T item : list) {
sb.append(item);
}
return sb.toString();
}
private static long parseLong(String s) {
if(s.isEmpty()) {
return 0;
} else {
return Long.parseLong(s);
}
}
public static void main(String[] args) {
System.out.println("--------------------");
System.out.println("Capreca-Nummerndefinition 2");
long start = System.nanoTime();
LongStream.rangeClosed(0, 600_000_000)
//Paralleler Stream
.parallel()
.filter(n -> n % 9 == 0)
.filter(ParallelStreamSample::isKaprekarNumber2)
.forEachOrdered(System.out::println);
long end = System.nanoTime();
System.out.printf("Verarbeitungszeit(ms): %d\n", (end - start) / 1_000_000);
System.out.println("--------------------");
}
}
Mit Ausnahme des Kommentars wurde nur eine Zeile geändert. (Sequential ()
wird in parallel ()
geändert)
Auf diese Weise können Sie parallele Streams anstelle von sequentiellen Streams verwenden.
Dies hat die folgenden Ergebnisse in meiner Umgebung.
--------------------
Capreca-Nummerndefinition 2
0
495
6174
549945
631764
63317664
97508421
554999445
Verarbeitungszeit(ms): 22366
--------------------
Es sind ungefähr 22 Sekunden. Der sequentielle Stream dauerte ungefähr 1 Minute, sodass die Verarbeitungszeit weniger als die Hälfte beträgt. Es ist wunderbar.
Dies liegt daran, dass der sequentielle Stream nur einen einzigen Kern für die Ausführung verwendet, während der parallele Stream mehrere Kerne für die gemeinsame Verarbeitung verwendet. Es ist, als würde man einen überspringenden Kern auspeitschen, damit er schneller funktioniert. (Nur Bild)
Dieses Beispiel ist mit nur einem Zeilenwechsel schneller, funktioniert aber nicht immer so. Um die Vorteile paralleler Streams nutzen zu können, müssen einige Bedingungen erfüllt sein.
Dies ist eine wichtige Voraussetzung. Da es sich um einen Mechanismus zur Beschleunigung durch Verwendung mehrerer Kerne handelt, ist er in einer einzelnen Kernumgebung nicht schneller. Vielmehr wird es durch den Overhead für die Parallelverarbeitung verlangsamt.
Dies ist auch eine wichtige Voraussetzung. Es ist ein Image, das es schneller macht, indem alle CPU-Ressourcen verwendet werden. Es ist also bedeutungslos, wenn etwas anderes als die CPU der Engpass ist.
Das Ausführen auf mehreren Kernen bedeutet das Ausführen auf mehreren Threads. Daher muss die auszuführende Verarbeitung threadsicher sein. Die obige Methode "isKaprekarNumber2 ()" ist threadsicher, da sie nur von den Argumenten abhängt. Lassen Sie uns jedoch sehen, was in einer nicht threadsicheren Implementierung geschieht, die vom Feld anstelle des Arguments abhängt. (Es ist ein seltsames Beispiel ...)
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class ThreadUnsafeSample {
public static void main(String[] args) {
System.out.println("--------------------");
System.out.println("Capreca-Nummerndefinition 2");
long start = System.nanoTime();
var myNum = new MyNumber();
LongStream.rangeClosed(0, 600_000_000)
.parallel()
.filter(n -> n % 9 == 0)
.filter(n -> {
//Nicht threadsicher, da Felder ohne ausschließliche Kontrolle gelesen und geschrieben werden
myNum.setNum(n);
return myNum.isKaprekarNumber2();
})
.forEachOrdered(System.out::println);
long end = System.nanoTime();
System.out.printf("Verarbeitungszeit(ms): %d\n", (end - start) / 1_000_000);
System.out.println("--------------------");
}
private static class MyNumber {
private long num;
private void setNum(long num) {
this.num = num;
}
//Ob es die Anzahl der Capreca ist (Definition 2)
private boolean isKaprekarNumber2() {
List<Character> chars = String.valueOf(num)
.chars()
.mapToObj(c -> (char)c)
.collect(Collectors.toList());
// min
chars.sort(Comparator.naturalOrder());
var min = parseLong(join(chars));
// max
chars.sort(Comparator.reverseOrder());
var max = parseLong(join(chars));
return num == (max - min);
}
private <T> String join(List<T> list) {
var sb = new StringBuilder();
for(T item : list) {
sb.append(item);
}
return sb.toString();
}
private long parseLong(String s) {
if(s.isEmpty()) {
return 0;
} else {
return Long.parseLong(s);
}
}
}
}
In meiner Umgebung habe ich die folgenden anscheinend falschen Ergebnisse erhalten:
--------------------
Capreca-Nummerndefinition 2
Verarbeitungszeit(ms): 23902
--------------------
Wie Sie sehen, funktioniert die Verwendung paralleler Streams in einer nicht threadsicheren Implementierung nicht wie erwartet. Seien Sie also vorsichtig. Wenn ich "parallal ()" gelöscht oder in "sequential ()" umgeschrieben und in einem sequentiellen Stream ausgeführt habe, war das Ergebnis natürlich korrekt. (Abgesehen davon, dass ich zu spät komme ...)
--------------------
Capreca-Nummerndefinition 2
0
495
6174
549945
631764
63317664
97508421
554999445
Verarbeitungszeit(ms): 75049
--------------------
In der Fortsetzung des Obigen wird das korrekte Ergebnis erhalten, wenn eine exklusive Steuerung im Lese- / Schreibteil des Feldes durchgeführt wird.
.filter(n -> {
synchronized(myNum) {
myNum.setNum(n);
return myNum.isKaprekarNumber2();
}
})
Das Folgende ist das Ausführungsergebnis.
--------------------
Capreca-Nummerndefinition 2
0
495
6174
549945
631764
63317664
97508421
554999445
Verarbeitungszeit(ms): 90823
--------------------
Die Ausgabe ist korrekt, aber 1,5-mal langsamer als die ursprüngliche Implementierung. Dies liegt daran, dass die exklusive Steuerung effektiv zur Ausführung in einem einzelnen Thread führt (+ es gibt einen Overhead für die parallele Ausführung). Sie wissen nicht, wofür der parallele Stream ist.
Wenn Sie parallele Streams gut verwenden, können Sie mit einer deutlichen Leistungssteigerung rechnen. Es gibt jedoch viele Bedingungen, um den Effekt zu erzielen. Verwenden Sie ihn daher mit Vorsicht.
Über den folgenden Teil
// max
chars.sort(Comparator.reverseOrder());
Eigentlich scheint das Folgende schneller zu sein. (Zu diesem Zeitpunkt wird "Zeichen" in aufsteigender Reihenfolge sortiert.)
// max
Collections.reverse(chars);
Die -Korrektur ist jedoch problematisch </ s> und wirkt sich nicht so stark auf das Hauptthema aus. Deshalb lasse ich es so, wie es ist.