Ich schreibe und verstehe nur den gRPC-Server, aber ich weiß nicht viel darüber, also kann ich nicht weitermachen. Versuchen Sie es jedoch langsam auf Ihre eigene Weise. Das zweite Thema befasst sich mit den Schnittstellen "Future" und "Callable". Ohne dies zu verstehen, kann "gRPC" nicht verstanden werden. Dieser Bereich hat einen Duft, der dem Mechanismus von C # Task und async / await ähnlich zu sein scheint, also als Thema
Ich möchte ** FanOut / FanIn ** in Java implementieren.
Die Schnittstelle ist so beschaffen, dass sie einfach einen Rückgabewert vom Typ T zurückgibt.
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Erstellen wir eine Implementierungsklasse, die dies implementiert. Dies ist der Teil, der parallel arbeitet. Da davon ausgegangen wird, dass mehrere Threads parallel ausgeführt werden, wird die ID von CurrentThread angezeigt. Eine Sache, auf die ich hier neugierig bin, ist, war es in Ordnung, "Thread.sleep ()" zu verwenden? Ich sage das. Bei async / await in C # ist dies unzulässig, da asynchrone Aufrufe Thread nicht immer belegen, sodass es unpraktisch ist, Thread in den Ruhezustand zu versetzen. Selbst wenn ich Java-Verzögerung - 4 Möglichkeiten zum Hinzufügen von Verzögerung in Java gelesen habe, gibt es keine besonderen Kommentare. Lassen Sie es uns jetzt verwenden.
Activity
public class Activity implements Callable<String> {
public String call() throws Exception {
Thread currentThread = Thread.currentThread();
for (int i = 0; i < 100; i++) {
Thread.sleep(100);
System.out.println("[" + currentThread.getId() + "] Activity: " + i);
}
System.out.println("[" + currentThread.getId() +"] Done");
return "Thread [" + currentThread.getId() + "] has done.";
}
}
Erstellen Sie nun eine Klasse, die dies ausführt. Ich möchte Aktivitäten parallel verarbeiten und wenn alle fertig sind, möchte ich das Ergebnis anzeigen und fertig stellen.
Für Java scheint es etwas zu verwenden, das "ExecutorService" genannt wird. In Executors
wurden mehrere Methoden definiert, und diejenigen, die wir jetzt verwenden, sind definiert, um eine feste Anzahl von Thread-Pools, einen einzelnen Thread-Pool, einen Cache usw. zu erstellen. In diesem Fall verfügt es über 5 Thread-Pools. Im Gegensatz zu C # belegt es den Thread von Zeit zu Zeit. Ist es also in Ordnung, "Thread.sleep ()" zu verwenden?
Future interface Zukunft ist ein Bild des Äquivalents von "Aufgabe" in C #. Dieses Objekt wird als Rückgabewert zurückgegeben, wenn der Thread ausgeführt und die parallele Arbeit gestartet wird. Das Ausgeben der get-Methode blockiert, bis der Thread die Verarbeitung beendet hat.
public interface Future<V>
{
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Die Methode executor.submit
erstellt diese Zukunft
. Wenn Sie hier die vorherige Aktivität übergeben haben, wird die Verarbeitung parallel ausgeführt. Speichern Sie das Objekt von "Future" in der Liste. Warten Sie dann bei executor.awaitTermination
. Das Endergebnis wird mit der Methode "get ()" erhalten und angezeigt. Hier gibt es jedoch ein Problem.
public class FanOutFanIn {
public void execute() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Future<String>> results = new ArrayList<>();
for (int i = 0; i < 10 ; i++) {
Future<String> result = executor.submit(new Activity());
results.add(result);
}
executor.awaitTermination(30, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) {
System.out.println(results.get(i).get());
}
}
}
Mein ursprüngliches Bild war, dass die gesamte Verarbeitung abgeschlossen war oder das Ausführungsergebnis mit einer Zeitüberschreitung angezeigt wurde, aber das ist nicht der Fall. awaitTermination
blockiert auch nach Abschluss der Verarbeitung weiter. Mit diesem Code wird erst nach Ablauf von 30 Sekunden mit dem nächsten Block fortgefahren.
CompletableFuture
Daher scheint es etwas zu geben, das als "abgeschlossene Zukunft" bezeichnet wird. Die eigentliche Schnittstellenimplementierung ist lang, daher werde ich sie weglassen, aber es ist eine Klasse, die die Schnittstellen "Future" und "CompletionStage" implementiert. Von der "Zukunft" hat es eine definitive "Vollendung". Dies scheint für eine Vielzahl von Zwecken nützlich zu sein, scheint aber auch für "FanOut / FanIn" -Szenarien geeignet zu sein. Schreiben Sie die obige, die so funktioniert, wie Sie es möchten.
"CompletableFuture" wird von "Runnable" implementiert, da "Callable" nicht verwendet werden kann. Es ist etwas klobig und ich denke, es könnte besser geschrieben werden, aber Runnable
kann keinen Rückgabewert zurückgeben. Wenn ich mich wie der vorherige Code verhalten wollte (Rückgabe eines Strings), ist das ein Schmerz, aber ich habe eine Methode namens "getResult" implementiert. Jep. Es muss einen besseren Weg geben, es zu schreiben. Bitte lass ein Kommentar da.
public class RunnableActivity implements Runnable {
private String result;
@Override
public void run() {
Thread currentThread = Thread.currentThread();
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("[" + currentThread.getId() + "] Activity: " + i);
}
System.out.println("[" + currentThread.getId() +"] Done");
this.result = "Thread [" + currentThread.getId() + "] has done.";
}
public String getResult() {
return this.result;
}
}
Ich schreibe "runAsync" selbst, aber ich schreibe es selbst, um den Rückgabewert von Runnable zurückzusetzen. Jep. Es muss einen besseren Weg geben. Es ist ähnlich wie der vorherige, aber da ich "CompletableFuture" verwende, verwende ich "Runnable" und mit der "CompletableFuture.allOf" -Methode fühlt es sich diesmal gut an und alles endet. Werde auf dich warten. Dies ist der, den ich wollte.
Wenn Sie fertig sind, können Sie dieselbe Schleife wie beim letzten Mal verwenden. Wenn jedoch alle Threads ordnungsgemäß abgeschlossen sind, können Sie den Wert mithilfe von Linq-ähnlichem Schreiben als Lambda abrufen und ausgeben.
public class CompletableFanOutFanIn {
public void execute() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
// ExecutorService executor = Executors.newSingleThreadExecutor();
List<CompletableFuture<String>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CompletableFuture<String> result = runAsync(new RunnableActivity(), executor);
results.add(result);
}
CompletableFuture<Void> cf = CompletableFuture.allOf(results.toArray(new CompletableFuture[10]));
cf.whenComplete((ret, ex) -> {
if(ex == null) {
String msg = results.stream().map(future -> {
try {
return future.get();
} catch (Exception iex) {
return iex.toString();
}
}).collect(Collectors.joining("\n"));
System.out.println("result = " + msg);
} else {
System.err.println(ex);
}
});
}
private CompletableFuture<String> runAsync(RunnableActivity runnable, ExecutorService executor) {
CompletableFuture<String> cf = new CompletableFuture<>();
executor.execute(() -> {
runnable.run();
cf.complete(runnable.getResult());
});
return cf;
}
}
Result
Übrigens wird mit C # Task stattdessen Thread.id geteilt, aber Java scheint eindeutig ein Thread zu sein.
[23] Activity: 0
[22] Activity: 0
[21] Activity: 0
[20] Activity: 0
[24] Activity: 0
[20] Activity: 1
[23] Activity: 1
[22] Activity: 1
[24] Activity: 1
[21] Activity: 1
[20] Activity: 2
:
[20] Activity: 97
[24] Activity: 97
[22] Activity: 97
[20] Activity: 98
[23] Activity: 98
[21] Activity: 98
[24] Activity: 98
[22] Activity: 98
[22] Activity: 99
[22] Done
[24] Activity: 99
[24] Done
[23] Activity: 99
[23] Done
[21] Activity: 99
[21] Done
[20] Activity: 99
[20] Done
result = Thread [20] has done.
Thread [21] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
Thread [21] has done.
Thread [20] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
Um ehrlich zu sein, bin ich nach Java8 immer noch nicht zuversichtlich, dass Programme gleichzeitig ausgeführt werden. Nehmen Sie sich Zeit und lernen Sie hart. Rund um Pluralsight. Beeilen Sie sich nicht, Schritt für Schritt. Zunächst konnte ich das umsetzen, was ich ursprünglich für den ersten Schritt gehalten hatte, und bin heute glücklich.
Recommended Posts