Dieser Artikel ist der 12. Tag von Java Adventskalender 2016.
Am Tag zuvor war es die Easy Database-Object-Oriented Database (JPA) von leak4mk0, auch mit Java SE. Als nächstes folgt die objektorientierte Datenbank (JPA) von tkxlab auch in Java SE.
Es ist einige Jahre her, seit wir in die Multi-Core-Ära eingetreten sind, die das Ende des kostenlosen Mittagessens bedeuten soll. Der Grund ist, dass ich oft Web-Apps mache, aber dieses Jahr ist die Welle der parallelen Programmierung zu mir gekommen, die sich mit fast einzelnen Threads entwickelt hat (obwohl es Fälle gibt, in denen ich mir Multithreads wie Serve Red bewusst bin). .. Aus diesem Grund fasst der diesjährige Adventskalender eine Einführung in die parallele Programmierung zusammen.
Java wurde von Anfang an für die Multithread-Programmierung angepriesen und ist eine Sprache, die die parallele Programmierung so einfach wie sie ist macht. Es wurde mit dem Versions-Upgrade stetig erweitert und verfügt derzeit über die folgenden APIs.
Mal sehen, wie man es eins nach dem anderen benutzt.
"Vergiss mich"
Das ist ein halber Witz, aber in den meisten Fällen sollten Sie die einfache Thread-Klasse nicht direkt verwenden.
Ich habe auch damit begonnen, einfachen Thread zu verwenden, der auf alten Kenntnissen basiert, und wenn ich mit Multithreading google, ist er immer noch ganz oben. Es ist viel einfacher, Future und Parallel Stream zu verwenden, und was Sie mit Thread tun können, ist mit Executor normalerweise intelligenter.
Eine einfache Parallelverarbeitungs-API basierend auf einem Thread-Pool. Der Prozess wird in die Granularität "Task" zerlegt, dem Thread-Pool zugewiesen und ausgeführt. Da Task durch einen Klassen- oder Lambda-Ausdruck dargestellt wird, der die Runnable-Schnittstelle erbt, kann er effektiv als "Aufwärtskompatibilität von Thread, der Thread-Pools unterstützt" verwendet werden.
Außerdem verwenden wir normalerweise einen Executor (Name ist verwirrend!), Der als Executor Service bezeichnet wird und Future verwalten kann.
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(2);
try {
es.execute(() -> System.out.println("executor:1, thread-id:" + Thread.currentThread().getId()));
es.execute(() -> System.out.println("executor:2, thread-id:" + Thread.currentThread().getId()));
es.execute(() -> System.out.println("executor:3, thread-id:" + Thread.currentThread().getId()));
es.execute(() -> System.out.println("executor:4, thread-id:" + Thread.currentThread().getId()));
es.execute(() -> System.out.println("executor:5, thread-id:" + Thread.currentThread().getId()));
} finally {
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
}
}
Das Ausführungsergebnis ist wie folgt.
executor:1, thread-id:11
executor:2, thread-id:11
executor:3, thread-id:12
executor:4, thread-id:12
executor:5, thread-id:12
Sie können sehen, dass der in newFixedThreadPool angegebene Thread wiederverwendet wird, obwohl die Ausführung fünfmal ausgeführt wird. Executors ist eine Factory, die ExecutorService erstellt und über eine Methode zum Erstellen eines Thread-Pools mit der folgenden Strategie verfügt.
Methodenname | Überblick |
---|---|
newCachedThreadPool | Ein Thread-Pool, der nach Bedarf neue Threads erstellt, zuvor erstellte Threads jedoch wiederverwendet, sofern verfügbar |
newFixedThreadPool | Thread-Pool, der eine festgelegte Anzahl von Threads wiederverwendet |
newWorkStealingPool | Ein Thread-Pool, der die maximale Anzahl von CPU-Kernen oder die angegebene Anzahl von Parallelen enthält. Jedem Thread wird eine Aufgabenwarteschlange zugewiesen. Wenn die Warteschlange frei wird, werden Aufgaben von anderen Threads abgefangen.(Work Stealing)Herstellen |
newWorkStealingPool ist eine neue Methode, die in Java 8 veröffentlicht wurde. Threads werden effizient zugewiesen. Wenn also kein Problem vorliegt, wird es übernommen.
Führen Sie eine parallele Programmierung mit Fork und Join durch, z. B. Unix-Prozessverwaltung.
Es handelt sich um eine API, die entwickelt wurde, um eine CPU-schwere Verarbeitung durchzuführen, z. B. eine rekursive Split-Governance. Da ForkJoinPool verwendet wird, das den Work Stealing-Algorithmus verwendet, können Aufgaben, die kleiner als Executor sind, effizient ausgeführt werden.
Im Gegensatz zu Java 7 verfügt Java 8 jedoch auch über einen neuen WorkStealingPool in Executor, sodass sich die Ausführungseffizienz meines Erachtens nicht wesentlich ändern wird. (※ unbestätigt)
Ich bin gut darin, die Wiederholung zu beschleunigen. Machen wir also ein Beispiel, um die bekannte Fibonacci-Sequenz zu finden.
Zunächst eine Fibonacci-Zahlenfolge, die durch normale Wiederholung zum Vergleich erstellt wurde.
public static int fib(int n) {
if (n == 0) {
return 0;
} else if (n == 1) {
return 1;
} else {
return fib(n - 1) + fib(n - 2);
}
}
public static void main(String[] args) {
System.out.println(fib(45));
}
Wenn dies mit ForkJoin implementiert wird, sieht es wie folgt aus.
static class FibonacciTask extends RecursiveTask<Integer> {
private final int n;
FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n == 0) {
return 0;
} else if (n == 1) {
return 1;
} else {
FibonacciTask f1 = new FibonacciTask(n - 1);
FibonacciTask f2 = new FibonacciTask(n - 2);
f2.fork();
return f1.compute() + f2.join();
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
System.out.println(pool.invoke(new FibonacciTask(45)));
}
Ich schreibe einen Prozess, indem ich eine FibonacciTask erstelle, die die RecursiveTask erbt.
Ursprünglich wird die Erfassung der Werte von f1: fib (n-1) und f2: fib (f-2), die ursprünglich unter Verwendung von rekursiv erhalten wurden, durch rekursiv für f1 und fork / join für f2 realisiert. Ist der Punkt. Jedes Mal, wenn Sie sich teilen, wird eine neue asynchrone Aufgabe erstellt, sodass der Grad der Parallelität zunimmt. Da f1 eine rekursive Berechnung ist, wird die Verarbeitung kontinuierlich ausgeführt, ohne auf das Ergebnis von f2 zu warten. Infolgedessen kann eine große Anzahl von asynchronen Aufgaben ausgeführt werden und die parallele Berechnung kann fortgesetzt werden.
Die Aufrufmethode ist eine Methode, die "den Wartewert synchron zurückgibt, bis die Verarbeitung abgeschlossen ist". Es ist auch eine Wrapper-Methode für Task.fork.join.
Vielleicht habe ich jedoch einen Fehler beim Schreiben von etwas gemacht oder es war ein Overhead-Problem, und in dem Code, den ich dieses Mal geschrieben habe, ist der einfache Code, der in einem einzelnen Thread ausgeführt wird, fast doppelt so schnell wie Fork / Join. .. .. Wenn Sie es also tatsächlich verwenden, sollten Sie messen, ob es schneller ist. Dies hängt vom Anwendungsfall ab.
CompletableFuture ist eine API zur Realisierung von Future / Promise, einem der von Java8 eingeführten Entwurfsmuster für die parallele Programmierung. Mein Verständnis ist, dass es sich um ein Entwurfsmuster handelt, mit dem asynchrone Programmierung wie synchrone Programmierung behandelt wird, und im Fall von Java 8 wird die Asynchronität durch Threads garantiert.
Ich werde es so schreiben.
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newWorkStealingPool();
//Einfache asynchrone Ausführung
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "heavy process", es);
System.out.println(future.get());
//Eine Synthese ist ebenfalls möglich
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "task1", es);
CompletableFuture<String> f2 = f1.thenApply((x) -> x + ":task2");
System.out.println(f2.get());
//Sie können es in eine Liste packen oder alles auf einmal verarbeiten.
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int n = i;
futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + n), es));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
}
In diesem Beispiel wird ExecutorService als Argument übergeben. Selbst wenn Sie Threads in der Mitte wie JavaEE steuern und über einen ExecutorService verfügen, der dies unterstützt, können Sie ihn auf diese Weise ohne Konflikte verwenden. Es scheint, dass ForkJoinPool.commonPool () standardmäßig verwendet wird.
Erstens ist es eine einfache asynchrone Ausführung, aber es ist eine Form der Ausführung mit einem Lambda-Ausdruck als Argument mit supplyAsync oder runAsync. Durch Ausführen von get wartet es auf das Ende von Future und erhält den Rückgabewert.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "heavy process", es);
System.out.println(future.get());
Der Vorteil von CompletableFuture besteht darin, dass Sie den asynchronen Prozess selbst als Variable in Form von Future behandeln können, sodass Sie einfach einen Prozess schreiben können, der das Ergebnis eines bestimmten asynchronen Prozesses als Argument verwendet. Wenn Sie beispielsweise eine Webseite abrufen, verarbeiten und in der Datenbank registrieren, übergeben Sie normalerweise eine Rückruffunktion als Argument. Dies führt leicht zur Rückrufhölle.
Im Fall von CompletableFuture können Sie thenApply usw. verwenden, um die Nachbearbeitung einer solchen asynchronen Verarbeitung auszudrücken, ohne den unten gezeigten Rückruf zu verwenden.
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "task1", es);
CompletableFuture<String> f2 = f1.thenApply((x) -> x + ":task2");
System.out.println(f2.get());
Diese Art der Verarbeitung wird als "Synthese" bezeichnet. Wenn Sie also nicht daran gewöhnt sind, werden Sie das Gefühl haben, "Was ist Komposition?". Aber ich hoffe, Sie werden verstehen, dass es keine so große Sache ist.
Da CompletableFuture nur ein Wert ist, kann es in eine Liste gepackt oder gleichzeitig mit einem Array (Argument variabler Länge in Bezug auf die Methode) mit allOf ausgeführt werden.
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int n = i;
futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + n), es));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
Es gibt viele andere Funktionen von CompletableFuture, aber ich kann sie nicht ein wenig schreiben, deshalb werde ich sie hier weglassen.
Wenn man von der in Java8 eingeführten Parallelität spricht, denken viele Leute darüber nach. ParallelStream für parallele Erfassungsvorgänge.
Parallele Sammlungen werden seit langem in deklarativen, seitenfreien Sprachen wie Funktionssprachen verwendet, aber schließlich in Java in Java 8 eingeführt. Dies ist ein ganz anderer Geschmack als zuvor, und es ist eine Methode, die die Parallelität auf der Systemseite automatisch verwaltet, anstatt sich direkt mit Parallelität und Asynchronität zu befassen.
Im Gegensatz zu expliziten Paralleloperationen gibt es Einschränkungen, was erreicht werden kann, aber dies vereinfacht den Code erheblich. Aufgrund der Parallelisierung kann es auch viele andere Anwendungsfälle für Hochgeschwindigkeitsanforderungen abdecken.
Schauen wir uns zuerst den Code an.
public static void main(String[] args){
IntStream.range(1, 10).boxed()
.parallel()
.map(x -> "task" + x)
.forEach(System.out::println);
}
Es ist fast nicht von einem normalen Stream zu unterscheiden, oder? Der Unterschied besteht darin, dass .parallel hinzugefügt wird. Parallel dazu kann eine parallele Verarbeitung zusammengestellt werden. Da es sich um einen Stream handelt, ist es natürlich, den Rückgabewert der vorherigen Verarbeitung in der nachfolgenden Verarbeitung zu verwenden. Daher ist eine Geschichte wie "Future Synthesis" einfach.
Es ist eine sehr leistungsfähige und prägnante API, aber es gibt einige Einschränkungen. Denken Sie daran, dass es sich um eine parallele Programmierung handelt.
Beachten Sie, dass eine Variable oder eine andere externe Ressource als eine Umgebung, die Parallelität wie Ausschluss unterstützt, leicht beschädigt werden kann.
Neben der Standard-Java-Bibliothek gibt es weitere Bibliotheken, die Parallelität unterstützen. Die meiste Zeit habe ich es noch nicht ausprobiert, aber ich werde es nur erwähnen.
http://spark.apache.org/
Es ist eine verteilte Verarbeitungsplattform für die Ausführung großer Chargen. Für das Programmiermodell wird die parallele Erfassung wie in Parallel Stream übernommen.
Ich habe dies anders als in anderen Bibliotheken verwendet. Lesen Sie daher bitte den folgenden Artikel.
http://doc.akka.io/docs/akka/snapshot/intro/getting-started.html
Es ist eine Bibliothek / Middleware, um das Akteurmodell zu realisieren. Es ist von Scala abgeleitet, kann aber auch in Java verwendet werden. Obwohl es sich um eine parallele Programmierbibliothek für den Umgang mit Akteuren handelt, die leichter als Threads sind, wird diese Erkennung häufiger als verteilte Systeminfrastruktur verwendet.
https://www.cs.kent.ac.uk/projects/ofa/jcsp/
Es scheint eine Bibliothek für die Durchführung von Prozessalgebra in Java zu sein. Die Prozessalgebra ist eine mathematische Methode für parallele Berechnungen, daher würde ich sie gerne beim nächsten Mal ausprobieren.
Nun, es ist einfach, aber ich habe die parallele Programmierung in Java zusammengefasst. Ich denke, dass viele Menschen ein starkes Image von einfachem Multithreading haben, aber Java hat sich stark verändert.
Grundsätzlich wird Parallel Stream und in schwierigen Fällen Completable Future wahrscheinlich in Zukunft zum Mainstream. Ich denke, es ist einfacher, sicheren Code zu schreiben, da sie nicht unbedingt gemeinsame Variablen zum Übergeben von Werten zwischen Threads verwenden. Ich habe es diesmal nicht eingeführt, aber es gibt Fälle, in denen Sie nicht blockierende Sammlungen verwenden können, auch wenn Sie unbedingt gemeinsame Variablen benötigen. Ich denke, Sie werden sie nach Bedarf verwenden.
Darüber hinaus ist in jüngster Zeit die groß angelegte Parallelisierung durch verteilte Systeme dank der Cloud über die parallele Programmierung auf einer einzelnen Maschine hinaus bekannt geworden. Ich möchte auch hier viel lernen.
Dann wird nächstes Jahr Happy Hacking!
Recommended Posts