Ich sehe heutzutage oft gRPC, aber was ist mit der bidirektionalen Kommunikation, was einer der Vorteile ist? Ich konnte nicht viel Literatur über die Verwendung in Java finden, daher werde ich es zusammenfassen.
Ausführungsumgebung usw. | Ausführung |
---|---|
JDK | 1.8.0_121 |
Kotlin | 1.1.2 |
gRPC | 1.5.0 |
Grobe Merkmale.
Der Punkt für den Moment ist, ** einen Stram zur Protodatei hinzuzufügen **
Eine bidirektionale Kommunikation ist möglich, indem "StreamObservable" in der der abgeschlossenen Stub-Klasse zugeordneten Methode ordnungsgemäß verarbeitet wird.
Schauen wir uns zunächst ein Beispiel für eine allgemeine 1 Anfrage und 1 Antwort an. Erstellen Sie einen einfachen Dienst, der die Zeichenfolge "Nachricht" verwendet und die invertierte Zeichenfolge "Ergebnis" zurückgibt
Bild
+----------+ +---------------+
| |--- Request -->| |
| Client | | Server |
| |<-- Response ---| |
+----------+ +---------------+
syntax = "proto3";
message StringRequest {
string message = 1;
}
message ReverseReply {
string result = 1;
}
service ReverseString {
rpc Exec (StringRequest) returns (ReverseReply) {}
}
Sie müssen hier nichts tun, ich zeige Ihnen, wie der von ProtocolBuffer automatisch generierte Code aussieht:
public static abstract class ReverseStringImplBase implements io.grpc.BindableService {
public void exec(StringRequest request,
io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
}
...
}
Dies ist ein Code-Auszug auf der Serverseite. Hier wird angenommen, dass gRPC am Standardport 6565 startet. Die Serverseite beschreibt die Antwort mit StreamObserver, aber es scheint, dass der Aufruf von "onNext" nur einmal zulässig ist. (Ich habe zweimal versucht, es aufzurufen, aber der Prozess wurde blockiert und es hat nicht funktioniert)
class ReverseStringService : ReverseStringGrpc.ReverseStringImplBase() {
override fun exec(request: StringRequest?, responseObserver: StreamObserver<ReverseReply>?) {
println("req : " + request?.message);
val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()
responseObserver?.onNext(reply)
responseObserver?.onCompleted()
}
}
Client-Implementierung der asynchronen Verarbeitung gemäß der folgenden Beschreibung
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build()
val stub = ReverseStringGrpc.newStub(channel)
val message = StringRequest.newBuilder().setMessage("morimori").build()
stub.exec(message, object : StreamObserver<ReverseReply> {
override fun onNext(reply: ReverseReply) {
println("res : " + reply.result)
}
override fun onError(t: Throwable) {}
override fun onCompleted() {
println("complete")
}
})
Server
req : morimori
Client
res : iromirom
complete
Ich möchte in der Lage sein, Request / Response mehrmals mit derselben TCP-Verbindung auszutauschen
Bild
+----------+ +---------------+
| |--- Request -->| |
| | | |
| |<-- Response ---| |
| | | |
| |--- Request -->| |
| Client | | Server |
| |<-- Response ---| |
| | | |
| |--- Request -->| |
| | | |
| |<-- Response ---| |
+----------+ +---------------+
ExecStream hinzugefügt
syntax = "proto3";
...
service ReverseString {
rpc Exec (StringRequest) returns (ReverseReply) {}
rpc ExecStream (stream StringRequest) returns (stream ReverseReply) {}
}
Eine streamExec-Methode wird mit beiden Argumenten und Rückgabewerten "StreamObject" hinzugefügt
public static abstract class ReverseStringImplBase implements io.grpc.BindableService {
public void exec(StringRequest request,
io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
}
public io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.StringRequest> execStream(
io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.ReverseReply> responseObserver) {
return asyncUnimplementedStreamingCall(METHOD_EXEC_STREAM, responseObserver);
}
...
}
Schreiben Sie Code, der StreamObservable zurückgibt Versuchen Sie, 2 Antworten für 1 Anforderung zurückzugeben, um den Unterschied in den Ausführungsergebnissen besser zu verstehen.
override fun execStream(responseObserver: StreamObserver<ReverseReply>?): StreamObserver<StringRequest> {
return object : StreamObserver<StringRequest> {
override fun onNext(request: StringRequest) {
println("req : " + request?.message);
val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()
responseObserver?.onNext(reply)
responseObserver?.onNext(reply)
}
override fun onError(t: Throwable) {}
override fun onCompleted() {
println("complete")
responseObserver?.onCompleted()
}
}
}
Die Implementierung auf der Clientseite wurde geändert, und der Stub-Typ wurde geändert, um "StreamObserver # onNext" zu verwenden, um eine Anforderung zu stellen.
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build()
val message_1 = StringRequest.newBuilder().setMessage("morimori1").build()
val message_2 = StringRequest.newBuilder().setMessage("morimori2").build()
val message_3 = StringRequest.newBuilder().setMessage("morimori3").build()
val stub = ReverseStringGrpc.newStub(channel)
val observer = stub.execStream(object : StreamObserver<ReverseReply> {
override fun onNext(reply: ReverseReply) {
println("res : " + reply.result)
}
override fun onError(t: Throwable?) {}
override fun onCompleted() {
println("complete")
}
})
observer.onNext(message_1)
observer.onNext(message_2)
observer.onNext(message_3)
observer.onComplete()
Server
req : morimori1
req : morimori2
req : morimori3
complete
Client
res : 1iromirom
res : 1iromirom
res : 2iromirom
res : 2iromirom
res : 3iromirom
res : 3iromirom
complete
Wie Sie den Beispielergebnissen entnehmen können, ist auch die folgende voreingenommene Kommunikation möglich.
Request:1、Response:N
+----------+ +---------------+
| |--- Request -->| |
| | | |
| | | |
| Client |<-- Response ---| Server |
| |<-- Response ---| |
| |<-- Response ---| |
| | | |
+----------+ +---------------+
Request:N、Response:1
+----------+ +---------------+
| |--- Request -->| |
| |--- Request -->| |
| |--- Request -->| |
| Client | | Server |
| | | |
| |<-- Response ---| |
| | | |
+----------+ +---------------+
Natürlich denke ich, dass es möglich ist, Dinge wie Anfrage: N, Antwort: N zu tun.
Für die bidirektionale Kommunikation wurde die entsprechende Stub-Klasse durch Hinzufügen von "stream" zur ProtocolBuffer-Definition generiert. In Bezug auf die Implementierung hatte ich das Gefühl, dass ich nicht so verwirrt wäre, wenn ich StreamObservable verwenden würde, das RxJavas Observable ähnelt, und mir der kontinuierlichen Ereignisverarbeitung bewusst wäre.
Der Eindruck ist, dass es ziemlich einfach ist, bidirektionale und kontinuierliche Hochleistungs-RPC zu schreiben. Da gRPC auch in mobilen Apps verwendet werden kann, frage ich mich, ob eine inkrementelle Suche effizient durchgeführt werden kann.
Recommended Posts