Essayez la communication bidirectionnelle avec gRPC Java

Je vois souvent gRPC de nos jours, mais qu'en est-il de la communication bidirectionnelle, qui est l'un des avantages? Je n'ai pas trouvé beaucoup de littérature sur son utilisation en Java, donc je vais le résumer.

À propos de ce document

Environnement utilisé

Environnement d'exécution, etc. version
JDK 1.8.0_121
Kotlin 1.1.2
gRPC 1.5.0

Qu'est-ce que le gRPC Ichiou?

Caractéristiques approximatives.

--RPC framework développé par Google (Tutoriel officiel utilisant Java ici)

Que faire de la communication bidirectionnelle?

Le point pour le moment est de ** ajouter un stram au fichier proto **

La communication bidirectionnelle est possible en traitant correctement StreamObservable dans la méthode associée à la classe stub terminée.

Demande / réponse simple

Tout d'abord, examinons un exemple d'une demande commune et d'une réponse. Créez un service simple qui prend la chaîne message et renvoie la chaîne inversée result

image

 +----------+                +---------------+
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |<-- Response ---|               |
 +----------+                +---------------+
syntax = "proto3";

message StringRequest {
    string message = 1;
}

message ReverseReply {
    string result = 1;
}

service ReverseString {
    rpc Exec (StringRequest) returns (ReverseReply) {}
}

Vous n'avez rien à faire ici, je vais vous montrer à quoi ressemble le code généré automatiquement par ProtocolBuffer:

public static abstract class ReverseStringImplBase implements io.grpc.BindableService {

    public void exec(StringRequest request,
        io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
    }

    ...
}

Ceci est un extrait de code côté serveur. Ici, il est supposé démarrer avec le port 6565 par défaut de gRPC. Le côté serveur décrit la réponse en utilisant StreamObserver, mais il semble que l'appel de ʻonNext` n'est autorisé qu'une seule fois. (J'ai essayé de l'appeler deux fois, mais le processus a été bloqué et cela n'a pas fonctionné)

class ReverseStringService : ReverseStringGrpc.ReverseStringImplBase() {
    override fun exec(request: StringRequest?, responseObserver: StreamObserver<ReverseReply>?) {
        println("req : " + request?.message);
        val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()

        responseObserver?.onNext(reply)
        responseObserver?.onCompleted()
    }
}

Implémentation client du traitement asynchrone pour correspondre à la description suivante

val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build()

val stub = ReverseStringGrpc.newStub(channel)
val message = StringRequest.newBuilder().setMessage("morimori").build()

stub.exec(message, object : StreamObserver<ReverseReply> {
    override fun onNext(reply: ReverseReply) {
        println("res : " + reply.result)
    }

    override fun onError(t: Throwable) {}

    override fun onCompleted() {
        println("complete")
    }
})

Résultat d'exécution

Server

req : morimori

Client

res : iromirom
complete

Demande / réponse bidirectionnelle

Je souhaite pouvoir échanger plusieurs fois Requête / Réponse avec la même connexion TCP

image

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |                |               |
 |          |<-- Response ---|               |
 |          |                |               |
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |<-- Response ---|               |
 |          |                |               |
 |          |---  Request -->|               |
 |          |                |               |
 |          |<-- Response ---|               |
 +----------+                +---------------+

Ajout de ʻExecStream`

syntax = "proto3";

...

service ReverseString {
    rpc Exec (StringRequest) returns (ReverseReply) {}
    rpc ExecStream (stream StringRequest) returns (stream ReverseReply) {}
}

Une méthode streamExec est ajoutée avec les arguments et les valeurs de retour StreamObject

public static abstract class ReverseStringImplBase implements io.grpc.BindableService {

    public void exec(StringRequest request,
        io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
    }

    public io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.StringRequest> execStream(
        io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.ReverseReply> responseObserver) {
      return asyncUnimplementedStreamingCall(METHOD_EXEC_STREAM, responseObserver);
    }

    ...
}

Ecrire du code qui renvoie StreamObservable Pour faciliter la compréhension de la différence entre les résultats d'exécution, essayez de renvoyer 2 réponses pour 1 requête

override fun execStream(responseObserver: StreamObserver<ReverseReply>?): StreamObserver<StringRequest> {
    return object : StreamObserver<StringRequest> {
        override fun onNext(request: StringRequest) {
            println("req : " + request?.message);
            val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()

            responseObserver?.onNext(reply)
            responseObserver?.onNext(reply)
        }

        override fun onError(t: Throwable) {}

        override fun onCompleted() {
            println("complete")
            responseObserver?.onCompleted()
        }
    }
}

L'implémentation côté client a changé et le type de stub a été changé pour utiliser StreamObserver # onNext pour faire une requête.

val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build()

val message_1 = StringRequest.newBuilder().setMessage("morimori1").build()
val message_2 = StringRequest.newBuilder().setMessage("morimori2").build()
val message_3 = StringRequest.newBuilder().setMessage("morimori3").build()

val stub = ReverseStringGrpc.newStub(channel)

val observer = stub.execStream(object : StreamObserver<ReverseReply> {
    override fun onNext(reply: ReverseReply) {
        println("res : " + reply.result)
    }

    override fun onError(t: Throwable?) {}

    override fun onCompleted() {
        println("complete")
    }
})

observer.onNext(message_1)
observer.onNext(message_2)
observer.onNext(message_3)
observer.onComplete()

Résultat d'exécution

Server

req : morimori1
req : morimori2
req : morimori3
complete

Client

res : 1iromirom
res : 1iromirom
res : 2iromirom
res : 2iromirom
res : 3iromirom
res : 3iromirom
complete

Comme vous pouvez le voir à partir des exemples de résultats, la communication biaisée suivante est également possible.

Request:1、Response:N

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |                |               |
 |          |                |               |
 |  Client  |<-- Response ---|     Server    |
 |          |<-- Response ---|               |
 |          |<-- Response ---|               |
 |          |                |               |
 +----------+                +---------------+

Request:N、Response:1

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |---  Request -->|               |
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |                |               |
 |          |<-- Response ---|               |
 |          |                |               |
 +----------+                +---------------+

Bien sûr, je pense qu'il est possible de faire des choses comme Request: N, Response: N.

Résumé

Pour la communication bidirectionnelle, la classe de stub correspondante a été générée en ajoutant stream à la définition ProtocolBuffer. En ce qui concerne l'implémentation, j'ai senti que je ne serais pas si confus si j'étais au courant du traitement continu des événements à l'aide de StreamObservable, qui est similaire à Observable de RxJava.

L'impression est qu'il est assez facile d'écrire des RPC bidirectionnels et continus hautes performances. Étant donné que gRPC peut également être utilisé dans les applications mobiles, je me demande si la recherche incrémentielle peut être réalisée efficacement.

Documents connexes

Recommended Posts

Essayez la communication bidirectionnelle avec gRPC Java
Essayez gRPC avec Java, Maven
Essayez la communication en utilisant gRPC sur un serveur Android + Java
[Java] Communication JSON avec jackson
Essayez la connexion DB avec Java
Essayez d'utiliser Redis avec Java (jar)
J'ai essayé la communication UDP avec Java
Essayons WebSocket avec Java et javascript!
Remarques sur la communication HTTP avec Java (OkHttp)
Essayez de gérer les bibliothèques Java avec AWS CodeArtifact
Essayez d'utiliser la télécommande Wii en Java
Essayez Java 8 Stream
Essayez grossièrement Java 9
Essayez d'intégrer Ruby et Java avec Dapr
Essayez d'implémenter TCP / IP + NIO avec JAVA
Essayez de déboguer un programme Java avec VS Code
Essayez DI avec Micronaut
Changer de siège avec Java
Installez Java avec Ansible
Téléchargement confortable avec JAVA
Principes de base du réseau Java (communication)
Bibliothèque de cartes bidirectionnelles Java
Téléchargement Java avec Ansible
Essayez de vous connecter à l'émulateur AzureCosmosDB pour Docker avec Java
Exécutez Mosquitto avec Docker et essayez la communication WebSocket avec MQTT
Raclons avec Java! !!
Construire Java avec Wercker
Essayez de créer Java dans un module natif avec GraalVM
Essayez la valeur de retour Java
Conversion Endian avec JAVA
[Débutant] Essayez de créer un jeu RPG simple avec Java ①
Communication socket avec un navigateur Web utilisant Java et JavaScript ②
Communication socket avec un navigateur Web utilisant Java et JavaScript ①
(Java) BDD facile avec Spectrum?
Utiliser des couches Lambda avec Java
Créer un multi-projet Java avec Gradle
Premiers pas avec Java Collection
Essayez d'utiliser RocksDB avec Java
Configuration Java avec Spring MVC
Authentification de base avec Java 11 HttpClient
Expérimentons l'expansion en ligne Java
Exécuter un lot avec docker-compose avec Java batch
[Template] Connexion MySQL avec Java
Réécrire Java try-catch avec facultatif
Installez Java 7 avec Homebrew (cask)
Essayez d'utiliser GloVe avec Deeplearning4j
Java pour jouer avec Function
Essayez d'appeler JavaScript en Java
Essayez de développer Spresense avec Java (1)
Essayez le type fonctionnel en Java! ①
[Java] JavaConfig avec classe interne statique
Exploitons Excel avec Java! !!
Gestion des versions Java avec SDKMAN
Cryptage / décryptage RSA avec Java 8
Pagination de PDF avec Java + PDFBox.jar
Trier les chaînes comme une fonction caractéristique avec Java
Orienté objet avec Strike Gundam (java)
Gestion des versions Java avec jenv