Je vois souvent gRPC de nos jours, mais qu'en est-il de la communication bidirectionnelle, qui est l'un des avantages? Je n'ai pas trouvé beaucoup de littérature sur son utilisation en Java, donc je vais le résumer.
Environnement d'exécution, etc. | version |
---|---|
JDK | 1.8.0_121 |
Kotlin | 1.1.2 |
gRPC | 1.5.0 |
Caractéristiques approximatives.
--RPC framework développé par Google (Tutoriel officiel utilisant Java ici)
Le point pour le moment est de ** ajouter un stram au fichier proto **
La communication bidirectionnelle est possible en traitant correctement StreamObservable
dans la méthode associée à la classe stub terminée.
Tout d'abord, examinons un exemple d'une demande commune et d'une réponse.
Créez un service simple qui prend la chaîne message
et renvoie la chaîne inversée result
image
+----------+ +---------------+
| |--- Request -->| |
| Client | | Server |
| |<-- Response ---| |
+----------+ +---------------+
syntax = "proto3";
message StringRequest {
string message = 1;
}
message ReverseReply {
string result = 1;
}
service ReverseString {
rpc Exec (StringRequest) returns (ReverseReply) {}
}
Vous n'avez rien à faire ici, je vais vous montrer à quoi ressemble le code généré automatiquement par ProtocolBuffer:
public static abstract class ReverseStringImplBase implements io.grpc.BindableService {
public void exec(StringRequest request,
io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
}
...
}
Ceci est un extrait de code côté serveur. Ici, il est supposé démarrer avec le port 6565 par défaut de gRPC. Le côté serveur décrit la réponse en utilisant StreamObserver, mais il semble que l'appel de ʻonNext` n'est autorisé qu'une seule fois. (J'ai essayé de l'appeler deux fois, mais le processus a été bloqué et cela n'a pas fonctionné)
class ReverseStringService : ReverseStringGrpc.ReverseStringImplBase() {
override fun exec(request: StringRequest?, responseObserver: StreamObserver<ReverseReply>?) {
println("req : " + request?.message);
val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()
responseObserver?.onNext(reply)
responseObserver?.onCompleted()
}
}
Implémentation client du traitement asynchrone pour correspondre à la description suivante
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build()
val stub = ReverseStringGrpc.newStub(channel)
val message = StringRequest.newBuilder().setMessage("morimori").build()
stub.exec(message, object : StreamObserver<ReverseReply> {
override fun onNext(reply: ReverseReply) {
println("res : " + reply.result)
}
override fun onError(t: Throwable) {}
override fun onCompleted() {
println("complete")
}
})
Server
req : morimori
Client
res : iromirom
complete
Je souhaite pouvoir échanger plusieurs fois Requête / Réponse avec la même connexion TCP
image
+----------+ +---------------+
| |--- Request -->| |
| | | |
| |<-- Response ---| |
| | | |
| |--- Request -->| |
| Client | | Server |
| |<-- Response ---| |
| | | |
| |--- Request -->| |
| | | |
| |<-- Response ---| |
+----------+ +---------------+
Ajout de ʻExecStream`
syntax = "proto3";
...
service ReverseString {
rpc Exec (StringRequest) returns (ReverseReply) {}
rpc ExecStream (stream StringRequest) returns (stream ReverseReply) {}
}
Une méthode streamExec est ajoutée avec les arguments et les valeurs de retour StreamObject
public static abstract class ReverseStringImplBase implements io.grpc.BindableService {
public void exec(StringRequest request,
io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
}
public io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.StringRequest> execStream(
io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.ReverseReply> responseObserver) {
return asyncUnimplementedStreamingCall(METHOD_EXEC_STREAM, responseObserver);
}
...
}
Ecrire du code qui renvoie StreamObservable Pour faciliter la compréhension de la différence entre les résultats d'exécution, essayez de renvoyer 2 réponses pour 1 requête
override fun execStream(responseObserver: StreamObserver<ReverseReply>?): StreamObserver<StringRequest> {
return object : StreamObserver<StringRequest> {
override fun onNext(request: StringRequest) {
println("req : " + request?.message);
val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()
responseObserver?.onNext(reply)
responseObserver?.onNext(reply)
}
override fun onError(t: Throwable) {}
override fun onCompleted() {
println("complete")
responseObserver?.onCompleted()
}
}
}
L'implémentation côté client a changé et le type de stub a été changé pour utiliser StreamObserver # onNext
pour faire une requête.
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build()
val message_1 = StringRequest.newBuilder().setMessage("morimori1").build()
val message_2 = StringRequest.newBuilder().setMessage("morimori2").build()
val message_3 = StringRequest.newBuilder().setMessage("morimori3").build()
val stub = ReverseStringGrpc.newStub(channel)
val observer = stub.execStream(object : StreamObserver<ReverseReply> {
override fun onNext(reply: ReverseReply) {
println("res : " + reply.result)
}
override fun onError(t: Throwable?) {}
override fun onCompleted() {
println("complete")
}
})
observer.onNext(message_1)
observer.onNext(message_2)
observer.onNext(message_3)
observer.onComplete()
Server
req : morimori1
req : morimori2
req : morimori3
complete
Client
res : 1iromirom
res : 1iromirom
res : 2iromirom
res : 2iromirom
res : 3iromirom
res : 3iromirom
complete
Comme vous pouvez le voir à partir des exemples de résultats, la communication biaisée suivante est également possible.
Request:1、Response:N
+----------+ +---------------+
| |--- Request -->| |
| | | |
| | | |
| Client |<-- Response ---| Server |
| |<-- Response ---| |
| |<-- Response ---| |
| | | |
+----------+ +---------------+
Request:N、Response:1
+----------+ +---------------+
| |--- Request -->| |
| |--- Request -->| |
| |--- Request -->| |
| Client | | Server |
| | | |
| |<-- Response ---| |
| | | |
+----------+ +---------------+
Bien sûr, je pense qu'il est possible de faire des choses comme Request: N, Response: N.
Pour la communication bidirectionnelle, la classe de stub correspondante a été générée en ajoutant stream
à la définition ProtocolBuffer.
En ce qui concerne l'implémentation, j'ai senti que je ne serais pas si confus si j'étais au courant du traitement continu des événements à l'aide de StreamObservable, qui est similaire à Observable de RxJava.
L'impression est qu'il est assez facile d'écrire des RPC bidirectionnels et continus hautes performances. Étant donné que gRPC peut également être utilisé dans les applications mobiles, je me demande si la recherche incrémentielle peut être réalisée efficacement.
Recommended Posts