Aktualisieren Sie Ihre Java-Kenntnisse, indem Sie einen gRPC-Server in Java schreiben (2).

Ich schreibe und verstehe nur den gRPC-Server, aber ich weiß nicht viel darüber, also kann ich nicht weitermachen. Versuchen Sie es jedoch langsam auf Ihre eigene Weise. Das zweite Thema befasst sich mit den Schnittstellen "Future" und "Callable". Ohne dies zu verstehen, kann "gRPC" nicht verstanden werden. Dieser Bereich hat einen Duft, der dem Mechanismus von C # Task und async / await ähnlich zu sein scheint, also als Thema

Ich möchte ** FanOut / FanIn ** in Java implementieren.

Aufrufbare Schnittstelle

Die Schnittstelle ist so beschaffen, dass sie einfach einen Rückgabewert vom Typ T zurückgibt.

@FunctionalInterface
public interface Callable<V> {
/**
 * Computes a result, or throws an exception if unable to do so.
 *
 * @return computed result
 * @throws Exception if unable to compute a result
 */
 
V call() throws Exception;
}

Erstellen wir eine Implementierungsklasse, die dies implementiert. Dies ist der Teil, der parallel arbeitet. Da davon ausgegangen wird, dass mehrere Threads parallel ausgeführt werden, wird die ID von CurrentThread angezeigt. Eine Sache, auf die ich hier neugierig bin, ist, war es in Ordnung, "Thread.sleep ()" zu verwenden? Ich sage das. Bei async / await in C # ist dies unzulässig, da asynchrone Aufrufe Thread nicht immer belegen, sodass es unpraktisch ist, Thread in den Ruhezustand zu versetzen. Selbst wenn ich Java-Verzögerung - 4 Möglichkeiten zum Hinzufügen von Verzögerung in Java gelesen habe, gibt es keine besonderen Kommentare. Lassen Sie es uns jetzt verwenden.

Activity

public class Activity implements Callable<String> {
    public String call() throws Exception {
        Thread currentThread = Thread.currentThread();
        for (int i = 0; i < 100; i++) {
            Thread.sleep(100);
            System.out.println("[" + currentThread.getId() + "] Activity: " + i);
        }
        System.out.println("[" + currentThread.getId() +"] Done");
        return "Thread [" + currentThread.getId() + "] has done.";
    }
}

Erstellen Sie nun eine Klasse, die dies ausführt. Ich möchte Aktivitäten parallel verarbeiten und wenn alle fertig sind, möchte ich das Ergebnis anzeigen und fertig stellen. Für Java scheint es etwas zu verwenden, das "ExecutorService" genannt wird. In Executors wurden mehrere Methoden definiert, und diejenigen, die wir jetzt verwenden, sind definiert, um eine feste Anzahl von Thread-Pools, einen einzelnen Thread-Pool, einen Cache usw. zu erstellen. In diesem Fall verfügt es über 5 Thread-Pools. Im Gegensatz zu C # belegt es den Thread von Zeit zu Zeit. Ist es also in Ordnung, "Thread.sleep ()" zu verwenden?

Future interface Zukunft ist ein Bild des Äquivalents von "Aufgabe" in C #. Dieses Objekt wird als Rückgabewert zurückgegeben, wenn der Thread ausgeführt und die parallele Arbeit gestartet wird. Das Ausgeben der get-Methode blockiert, bis der Thread die Verarbeitung beendet hat.

public interface Future<V> 
{
    boolean cancel(boolean mayInterruptIfRunning);
 
    boolean isCancelled();
 
    boolean isDone();
 
    V get() throws InterruptedException, ExecutionException;
 
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Die Methode executor.submit erstellt diese Zukunft. Wenn Sie hier die vorherige Aktivität übergeben haben, wird die Verarbeitung parallel ausgeführt. Speichern Sie das Objekt von "Future" in der Liste. Warten Sie dann bei executor.awaitTermination. Das Endergebnis wird mit der Methode "get ()" erhalten und angezeigt. Hier gibt es jedoch ein Problem.

public class FanOutFanIn {
    public void execute() throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Future<String>> results = new ArrayList<>();

        for (int i = 0; i < 10 ; i++) {
            Future<String> result = executor.submit(new Activity());
            results.add(result);
        }

        executor.awaitTermination(30, TimeUnit.SECONDS);

        for (int i = 0; i < results.size(); i++) {
            System.out.println(results.get(i).get());
        }

    }
}

Mein ursprüngliches Bild war, dass die gesamte Verarbeitung abgeschlossen war oder das Ausführungsergebnis mit einer Zeitüberschreitung angezeigt wurde, aber das ist nicht der Fall. awaitTermination blockiert auch nach Abschluss der Verarbeitung weiter. Mit diesem Code wird erst nach Ablauf von 30 Sekunden mit dem nächsten Block fortgefahren.

CompletableFuture

Daher scheint es etwas zu geben, das als "abgeschlossene Zukunft" bezeichnet wird. Die eigentliche Schnittstellenimplementierung ist lang, daher werde ich sie weglassen, aber es ist eine Klasse, die die Schnittstellen "Future" und "CompletionStage" implementiert. Von der "Zukunft" hat es eine definitive "Vollendung". Dies scheint für eine Vielzahl von Zwecken nützlich zu sein, scheint aber auch für "FanOut / FanIn" -Szenarien geeignet zu sein. Schreiben Sie die obige, die so funktioniert, wie Sie es möchten.

"CompletableFuture" wird von "Runnable" implementiert, da "Callable" nicht verwendet werden kann. Es ist etwas klobig und ich denke, es könnte besser geschrieben werden, aber Runnable kann keinen Rückgabewert zurückgeben. Wenn ich mich wie der vorherige Code verhalten wollte (Rückgabe eines Strings), ist das ein Schmerz, aber ich habe eine Methode namens "getResult" implementiert. Jep. Es muss einen besseren Weg geben, es zu schreiben. Bitte lass ein Kommentar da.

public class RunnableActivity implements Runnable {
    private String result;
    @Override
    public void run() {
        Thread currentThread = Thread.currentThread();
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("[" + currentThread.getId() + "] Activity: " + i);
        }
        System.out.println("[" + currentThread.getId() +"] Done");
        this.result =  "Thread [" + currentThread.getId() + "] has done.";
    }

    public String getResult() {
        return this.result;
    }
}

Ich schreibe "runAsync" selbst, aber ich schreibe es selbst, um den Rückgabewert von Runnable zurückzusetzen. Jep. Es muss einen besseren Weg geben. Es ist ähnlich wie der vorherige, aber da ich "CompletableFuture" verwende, verwende ich "Runnable" und mit der "CompletableFuture.allOf" -Methode fühlt es sich diesmal gut an und alles endet. Werde auf dich warten. Dies ist der, den ich wollte.

Wenn Sie fertig sind, können Sie dieselbe Schleife wie beim letzten Mal verwenden. Wenn jedoch alle Threads ordnungsgemäß abgeschlossen sind, können Sie den Wert mithilfe von Linq-ähnlichem Schreiben als Lambda abrufen und ausgeben.

public class CompletableFanOutFanIn {

        public void execute() throws InterruptedException, ExecutionException {
           ExecutorService executor = Executors.newFixedThreadPool(5);
           // ExecutorService executor = Executors.newSingleThreadExecutor();
            List<CompletableFuture<String>> results = new ArrayList<>();

            for (int i = 0; i < 10; i++) {
                CompletableFuture<String> result = runAsync(new RunnableActivity(), executor);
                results.add(result);
            }
            CompletableFuture<Void> cf = CompletableFuture.allOf(results.toArray(new CompletableFuture[10]));

            cf.whenComplete((ret, ex) -> {
               if(ex == null) {
                   String msg = results.stream().map(future -> {
                       try {
                           return future.get();
                       } catch (Exception iex) {
                           return iex.toString();
                       }
                   }).collect(Collectors.joining("\n"));
                   System.out.println("result = " + msg);
               } else {
                   System.err.println(ex);
               }
            });
        }

        private CompletableFuture<String> runAsync(RunnableActivity runnable, ExecutorService executor) {
            CompletableFuture<String> cf = new CompletableFuture<>();
            executor.execute(() -> {
                runnable.run();
                cf.complete(runnable.getResult());
            });
            return cf;
        }
}

Result

Übrigens wird mit C # Task stattdessen Thread.id geteilt, aber Java scheint eindeutig ein Thread zu sein.

[23] Activity: 0
[22] Activity: 0
[21] Activity: 0
[20] Activity: 0
[24] Activity: 0
[20] Activity: 1
[23] Activity: 1
[22] Activity: 1
[24] Activity: 1
[21] Activity: 1
[20] Activity: 2
     :
[20] Activity: 97
[24] Activity: 97
[22] Activity: 97
[20] Activity: 98
[23] Activity: 98
[21] Activity: 98
[24] Activity: 98
[22] Activity: 98
[22] Activity: 99
[22] Done
[24] Activity: 99
[24] Done
[23] Activity: 99
[23] Done
[21] Activity: 99
[21] Done
[20] Activity: 99
[20] Done
result = Thread [20] has done.
Thread [21] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
Thread [21] has done.
Thread [20] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.

Zusammenfassung

Um ehrlich zu sein, bin ich nach Java8 immer noch nicht zuversichtlich, dass Programme gleichzeitig ausgeführt werden. Nehmen Sie sich Zeit und lernen Sie hart. Rund um Pluralsight. Beeilen Sie sich nicht, Schritt für Schritt. Zunächst konnte ich das umsetzen, was ich ursprünglich für den ersten Schritt gehalten hatte, und bin heute glücklich.

Recommended Posts

Aktualisieren Sie Ihre Java-Kenntnisse, indem Sie einen gRPC-Server in Java schreiben (2).
Aktualisieren Sie Ihre Java-Kenntnisse, indem Sie einen gRPC-Server in Java schreiben (1)
Was ich beim Erstellen eines Servers in Java gelernt habe
Suchen Sie eine Teilmenge in Java
Erstellen Sie mit Java Ihren eigenen einfachen Server und verstehen Sie HTTP
Denken Sie an eine Java-Update-Strategie
3 Implementieren Sie einen einfachen Interpreter in Java
Ich habe ein PDF mit Java erstellt.
Eine Person, die C ++ schreibt, hat versucht, Java zu schreiben
Behandeln Sie Ihre eigenen Anmerkungen in Java
Implementieren Sie den gRPC-Client in Ruby
Ein einfaches Beispiel für Rückrufe in Java
Bleiben Sie in einem Java Primer stecken
Informationen zum Zurückgeben einer Referenz in einem Java Getter
[Java] Teilen Sie eine Zeichenfolge durch ein angegebenes Zeichen
Was ist eine Klasse in der Java-Sprache (3 /?)
Bei der Suche nach mehreren in einem Java-Array
Doppelte Karte sortiert nach Schlüssel in Java
Schriftliche Unterschiede in Ruby, PHP, Java, JS
Wichtige Punkte für die Einführung von gRPC in Java
Verstehen Sie die Java-Oberfläche auf Ihre eigene Weise
[Erstellen] Ein Memorandum über das Codieren in Java
Java erstellt eine Tabelle in einem Word-Dokument
Java erstellt ein Kreisdiagramm in Excel
Was ist eine Klasse in der Java-Sprache (1 /?)
Was ist eine Klasse in der Java-Sprache (2 /?)
Grundkenntnisse in der Java-Entwicklung Schreiben von Notizen
Erstellen Sie eine TODO-App in Java 7 Create Header
Lassen Sie uns eine Taschenrechner-App mit Java erstellen
Fortsetzung Sprechen Sie über das Schreiben von Java mit Emacs @ 2018
Abrufen des Verlaufs vom Zabbix-Server in Java
Implementieren Sie so etwas wie einen Stack in Java
Teilen Sie eine Zeichenfolge in Java mit ". (Dot)"
Erstellen einer Matrixklasse in Java Teil 1
Die Geschichte des Schreibens von Java in Emacs
Lesen und Schreiben von GZIP-Dateien in Java
Erstellen wir eine vielseitige Dateispeicher (?) - Operationsbibliothek, indem wir die Dateispeicherung / -erfassung mit Java abstrahieren
Organisieren Sie den Unterschied im Schreibkomfort zwischen dem Java-Lambda-Ausdruck und dem Kotlin-Lambda-Ausdruck.