Versuchen Sie die bidirektionale Kommunikation mit gRPC Java

Ich sehe heutzutage oft gRPC, aber was ist mit der bidirektionalen Kommunikation, was einer der Vorteile ist? Ich konnte nicht viel Literatur über die Verwendung in Java finden, daher werde ich es zusammenfassen.

Über dieses Dokument

Umgebung verwendet

Ausführungsumgebung usw. Ausführung
JDK 1.8.0_121
Kotlin 1.1.2
gRPC 1.5.0

Was ist Ichiou gRPC?

Grobe Merkmale.

Was tun mit bidirektionaler Kommunikation?

Der Punkt für den Moment ist, ** einen Stram zur Protodatei hinzuzufügen **

Eine bidirektionale Kommunikation ist möglich, indem "StreamObservable" in der der abgeschlossenen Stub-Klasse zugeordneten Methode ordnungsgemäß verarbeitet wird.

Einfache Anfrage / Antwort

Schauen wir uns zunächst ein Beispiel für eine allgemeine 1 Anfrage und 1 Antwort an. Erstellen Sie einen einfachen Dienst, der die Zeichenfolge "Nachricht" verwendet und die invertierte Zeichenfolge "Ergebnis" zurückgibt

Bild

 +----------+                +---------------+
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |<-- Response ---|               |
 +----------+                +---------------+
syntax = "proto3";

message StringRequest {
    string message = 1;
}

message ReverseReply {
    string result = 1;
}

service ReverseString {
    rpc Exec (StringRequest) returns (ReverseReply) {}
}

Sie müssen hier nichts tun, ich zeige Ihnen, wie der von ProtocolBuffer automatisch generierte Code aussieht:

public static abstract class ReverseStringImplBase implements io.grpc.BindableService {

    public void exec(StringRequest request,
        io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
    }

    ...
}

Dies ist ein Code-Auszug auf der Serverseite. Hier wird angenommen, dass gRPC am Standardport 6565 startet. Die Serverseite beschreibt die Antwort mit StreamObserver, aber es scheint, dass der Aufruf von "onNext" nur einmal zulässig ist. (Ich habe zweimal versucht, es aufzurufen, aber der Prozess wurde blockiert und es hat nicht funktioniert)

class ReverseStringService : ReverseStringGrpc.ReverseStringImplBase() {
    override fun exec(request: StringRequest?, responseObserver: StreamObserver<ReverseReply>?) {
        println("req : " + request?.message);
        val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()

        responseObserver?.onNext(reply)
        responseObserver?.onCompleted()
    }
}

Client-Implementierung der asynchronen Verarbeitung gemäß der folgenden Beschreibung

val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build()

val stub = ReverseStringGrpc.newStub(channel)
val message = StringRequest.newBuilder().setMessage("morimori").build()

stub.exec(message, object : StreamObserver<ReverseReply> {
    override fun onNext(reply: ReverseReply) {
        println("res : " + reply.result)
    }

    override fun onError(t: Throwable) {}

    override fun onCompleted() {
        println("complete")
    }
})

Ausführungsergebnis

Server

req : morimori

Client

res : iromirom
complete

Bidirektionale Anfrage / Antwort

Ich möchte in der Lage sein, Request / Response mehrmals mit derselben TCP-Verbindung auszutauschen

Bild

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |                |               |
 |          |<-- Response ---|               |
 |          |                |               |
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |<-- Response ---|               |
 |          |                |               |
 |          |---  Request -->|               |
 |          |                |               |
 |          |<-- Response ---|               |
 +----------+                +---------------+

ExecStream hinzugefügt

syntax = "proto3";

...

service ReverseString {
    rpc Exec (StringRequest) returns (ReverseReply) {}
    rpc ExecStream (stream StringRequest) returns (stream ReverseReply) {}
}

Eine streamExec-Methode wird mit beiden Argumenten und Rückgabewerten "StreamObject" hinzugefügt

public static abstract class ReverseStringImplBase implements io.grpc.BindableService {

    public void exec(StringRequest request,
        io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
    }

    public io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.StringRequest> execStream(
        io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.ReverseReply> responseObserver) {
      return asyncUnimplementedStreamingCall(METHOD_EXEC_STREAM, responseObserver);
    }

    ...
}

Schreiben Sie Code, der StreamObservable zurückgibt Versuchen Sie, 2 Antworten für 1 Anforderung zurückzugeben, um den Unterschied in den Ausführungsergebnissen besser zu verstehen.

override fun execStream(responseObserver: StreamObserver<ReverseReply>?): StreamObserver<StringRequest> {
    return object : StreamObserver<StringRequest> {
        override fun onNext(request: StringRequest) {
            println("req : " + request?.message);
            val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()

            responseObserver?.onNext(reply)
            responseObserver?.onNext(reply)
        }

        override fun onError(t: Throwable) {}

        override fun onCompleted() {
            println("complete")
            responseObserver?.onCompleted()
        }
    }
}

Die Implementierung auf der Clientseite wurde geändert, und der Stub-Typ wurde geändert, um "StreamObserver # onNext" zu verwenden, um eine Anforderung zu stellen.

val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build()

val message_1 = StringRequest.newBuilder().setMessage("morimori1").build()
val message_2 = StringRequest.newBuilder().setMessage("morimori2").build()
val message_3 = StringRequest.newBuilder().setMessage("morimori3").build()

val stub = ReverseStringGrpc.newStub(channel)

val observer = stub.execStream(object : StreamObserver<ReverseReply> {
    override fun onNext(reply: ReverseReply) {
        println("res : " + reply.result)
    }

    override fun onError(t: Throwable?) {}

    override fun onCompleted() {
        println("complete")
    }
})

observer.onNext(message_1)
observer.onNext(message_2)
observer.onNext(message_3)
observer.onComplete()

Ausführungsergebnis

Server

req : morimori1
req : morimori2
req : morimori3
complete

Client

res : 1iromirom
res : 1iromirom
res : 2iromirom
res : 2iromirom
res : 3iromirom
res : 3iromirom
complete

Wie Sie den Beispielergebnissen entnehmen können, ist auch die folgende voreingenommene Kommunikation möglich.

Request:1、Response:N

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |                |               |
 |          |                |               |
 |  Client  |<-- Response ---|     Server    |
 |          |<-- Response ---|               |
 |          |<-- Response ---|               |
 |          |                |               |
 +----------+                +---------------+

Request:N、Response:1

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |---  Request -->|               |
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |                |               |
 |          |<-- Response ---|               |
 |          |                |               |
 +----------+                +---------------+

Natürlich denke ich, dass es möglich ist, Dinge wie Anfrage: N, Antwort: N zu tun.

Zusammenfassung

Für die bidirektionale Kommunikation wurde die entsprechende Stub-Klasse durch Hinzufügen von "stream" zur ProtocolBuffer-Definition generiert. In Bezug auf die Implementierung hatte ich das Gefühl, dass ich nicht so verwirrt wäre, wenn ich StreamObservable verwenden würde, das RxJavas Observable ähnelt, und mir der kontinuierlichen Ereignisverarbeitung bewusst wäre.

Der Eindruck ist, dass es ziemlich einfach ist, bidirektionale und kontinuierliche Hochleistungs-RPC zu schreiben. Da gRPC auch in mobilen Apps verwendet werden kann, frage ich mich, ob eine inkrementelle Suche effizient durchgeführt werden kann.

Zugehörige Dokumente

Recommended Posts

Versuchen Sie die bidirektionale Kommunikation mit gRPC Java
Versuchen Sie gRPC mit Java, Maven
Versuchen Sie die Kommunikation mit gRPC auf einem Android + Java-Server
[Java] JSON-Kommunikation mit Jackson
Versuchen Sie eine DB-Verbindung mit Java
Versuchen Sie es mit Redis mit Java (jar)
Ich habe versucht, UDP mit Java zu kommunizieren
Probieren wir WebSocket mit Java und Javascript aus!
Hinweise zur HTTP-Kommunikation mit Java (OkHttp)
Versuchen Sie, Java-Bibliotheken mit AWS CodeArtifact zu verwalten
Versuchen Sie es mit der Wii-Fernbedienung in Java
Probieren Sie Java 8 Stream aus
Versuchen Sie es mit Java 9
Versuchen Sie, Ruby und Java in Dapr zu integrieren
Versuchen Sie, TCP / IP + NIO mit JAVA zu implementieren
Versuchen Sie, ein Java-Programm mit VS-Code zu debuggen
Versuchen Sie DI mit Micronaut
Wechseln Sie die Plätze mit Java
Installieren Sie Java mit Ansible
Bequemer Download mit JAVA
Java Network Basics (Kommunikation)
Bidirektionale Java-Kartenbibliothek
Java-Download mit Ansible
Versuchen Sie, mit Java eine Verbindung zu AzureCosmosDB Emulator for Docker herzustellen
Führen Sie Mosquitto mit Docker aus und versuchen Sie die WebSocket-Kommunikation mit MQTT
Lass uns mit Java kratzen! !!
Erstellen Sie Java mit Wercker
Versuchen Sie, Java mit GraalVM in ein natives Modul zu integrieren
Versuchen Sie es mit dem Java-Rückgabewert
Endian-Konvertierung mit JAVA
[Anfänger] Versuchen Sie, mit Java ein einfaches RPG-Spiel zu erstellen ①
Socket-Kommunikation mit einem Webbrowser über Java und JavaScript ②
Socket-Kommunikation mit einem Webbrowser über Java und JavaScript ①
(Java) Einfache BDD mit Spektrum?
Verwenden Sie Lambda-Ebenen mit Java
Erstellen Sie mit Gradle ein Java-Multiprojekt
Erste Schritte mit Java Collection
Versuchen Sie es mit RocksDB mit Java
Java-Konfiguration mit Spring MVC
Grundlegende Authentifizierung mit Java 11 HttpClient
Experimentieren wir mit der Java-Inline-Erweiterung
Führen Sie Batch mit Docker-Compose mit Java-Batch aus
[Vorlage] MySQL-Verbindung mit Java
Schreiben Sie Java Try-Catch mit Optional neu
Installieren Sie Java 7 mit Homebrew (Fass)
Versuchen Sie es mit GloVe mit Deeplearning4j
Java zum Spielen mit Function
Versuchen Sie, JavaScript in Java aufzurufen
Lassen Sie uns Spresense mit Java entwickeln (1)
Probieren Sie den Funktionstyp in Java aus! ①
[Java] JavaConfig mit statischer innerer Klasse
Lassen Sie uns Excel mit Java betreiben! !!
Java-Versionsverwaltung mit SDKMAN
RSA-Verschlüsselung / Entschlüsselung mit Java 8
Paging PDF mit Java + PDFBox.jar
Sortieren Sie Zeichenfolgen als charakteristische Funktion mit Java
Objektorientiert mit Strike Gundam (Java)
Java-Versionsverwaltung mit jenv