Try bidirectional communication with gRPC Java

I often see gRPC these days, but what about two-way communication, which is one of the advantages? I couldn't find a lot of literature on Java usage, so I'll summarize it.

About this document

--I'm going to describe what to do with bidirectional communication while maintaining a TCP connection when using gRPC with Java. --I will leave the explanation of gRPC settings and Protocol Buffer to related materials.

Environment used

Execution environment, etc. version
JDK 1.8.0_121
Kotlin 1.1.2
gRPC 1.5.0

What is Ichiou gRPC?

Rough features.

--RPC framework developed by Google (Official tutorials using Java here) --Unified calling between multiple languages is possible using Protocol Buffer --High-speed, two-way communication is possible using HTTP / 2

What to do with two-way communication?

For the time being, the point is ** add a stram to the proto file **

Two-way communication is possible by properly processing StreamObservable in the method associated with the completed stub class.

Simple Request / Response

First, let's look at an example of a common 1 request and 1 response. Create a simple service that takes the string message and returns the inverted string result

image

 +----------+                +---------------+
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |<-- Response ---|               |
 +----------+                +---------------+
syntax = "proto3";

message StringRequest {
    string message = 1;
}

message ReverseReply {
    string result = 1;
}

service ReverseString {
    rpc Exec (StringRequest) returns (ReverseReply) {}
}

You don't have to do anything here, I'll show you how the code automatically generated by Protocol Buffer looks like this:

public static abstract class ReverseStringImplBase implements io.grpc.BindableService {

    public void exec(StringRequest request,
        io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
    }

    ...
}

This is a code excerpt on the Server side. Here it is assumed to boot on gRPC default port 6565 The server side describes the response using StreamObserver, but it seems that the call of ʻonNext` is allowed only once. (I tried calling it twice, but the process was blocked and it didn't work)

class ReverseStringService : ReverseStringGrpc.ReverseStringImplBase() {
    override fun exec(request: StringRequest?, responseObserver: StreamObserver<ReverseReply>?) {
        println("req : " + request?.message);
        val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()

        responseObserver?.onNext(reply)
        responseObserver?.onCompleted()
    }
}

Client implementation of asynchronous processing to match the following description

val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build()

val stub = ReverseStringGrpc.newStub(channel)
val message = StringRequest.newBuilder().setMessage("morimori").build()

stub.exec(message, object : StreamObserver<ReverseReply> {
    override fun onNext(reply: ReverseReply) {
        println("res : " + reply.result)
    }

    override fun onError(t: Throwable) {}

    override fun onCompleted() {
        println("complete")
    }
})

Execution result

Server

req : morimori

Client

res : iromirom
complete

Bidirectional Request / Response

I want to be able to exchange Request / Response multiple times with the same TCP connection

image

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |                |               |
 |          |<-- Response ---|               |
 |          |                |               |
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |<-- Response ---|               |
 |          |                |               |
 |          |---  Request -->|               |
 |          |                |               |
 |          |<-- Response ---|               |
 +----------+                +---------------+

Added ʻExecStream`

syntax = "proto3";

...

service ReverseString {
    rpc Exec (StringRequest) returns (ReverseReply) {}
    rpc ExecStream (stream StringRequest) returns (stream ReverseReply) {}
}

A streamExec method with both arguments and return value StreamObject is added

public static abstract class ReverseStringImplBase implements io.grpc.BindableService {

    public void exec(StringRequest request,
        io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
    }

    public io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.StringRequest> execStream(
        io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.ReverseReply> responseObserver) {
      return asyncUnimplementedStreamingCall(METHOD_EXEC_STREAM, responseObserver);
    }

    ...
}

Write code that returns a StreamObservable To make it easier to understand the difference in execution results, I will return 2 Responses for 1 Request.

override fun execStream(responseObserver: StreamObserver<ReverseReply>?): StreamObserver<StringRequest> {
    return object : StreamObserver<StringRequest> {
        override fun onNext(request: StringRequest) {
            println("req : " + request?.message);
            val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()

            responseObserver?.onNext(reply)
            responseObserver?.onNext(reply)
        }

        override fun onError(t: Throwable) {}

        override fun onCompleted() {
            println("complete")
            responseObserver?.onCompleted()
        }
    }
}

The implementation on the Client side has changed, and the stub type has been changed to use StreamObserver # onNext to make a request.

val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build()

val message_1 = StringRequest.newBuilder().setMessage("morimori1").build()
val message_2 = StringRequest.newBuilder().setMessage("morimori2").build()
val message_3 = StringRequest.newBuilder().setMessage("morimori3").build()

val stub = ReverseStringGrpc.newStub(channel)

val observer = stub.execStream(object : StreamObserver<ReverseReply> {
    override fun onNext(reply: ReverseReply) {
        println("res : " + reply.result)
    }

    override fun onError(t: Throwable?) {}

    override fun onCompleted() {
        println("complete")
    }
})

observer.onNext(message_1)
observer.onNext(message_2)
observer.onNext(message_3)
observer.onComplete()

Execution result

Server

req : morimori1
req : morimori2
req : morimori3
complete

Client

res : 1iromirom
res : 1iromirom
res : 2iromirom
res : 2iromirom
res : 3iromirom
res : 3iromirom
complete

As you can see from the sample results, the following biased communication is also possible.

Request:1、Response:N

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |                |               |
 |          |                |               |
 |  Client  |<-- Response ---|     Server    |
 |          |<-- Response ---|               |
 |          |<-- Response ---|               |
 |          |                |               |
 +----------+                +---------------+

Request:N、Response:1

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |---  Request -->|               |
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |                |               |
 |          |<-- Response ---|               |
 |          |                |               |
 +----------+                +---------------+

Of course, I think it is possible to do things like Request: N, Response: N.

Summary

For two-way communication, the corresponding stub class was generated by adding stream to the ProtocolBuffer definition. Regarding the implementation, I felt that if I was aware of continuous event processing using StreamObservable, which is similar to RxJava Observable, I wouldn't be confused so much.

The impression is that it is fairly easy to write high-performance bidirectional and continuous RPC. Since gRPC can also be used with mobile apps, I wonder if incremental search can be realized efficiently.

Related Documents

-Setting memo when setting up a gRPC server with Spring Boot in Kotlin -Organization of RPC method in gRPC

Recommended Posts

Try bidirectional communication with gRPC Java
Try gRPC with Java, Maven
Try communication using gRPC on Android + Java server
[Java] JSON communication with jackson
Try DB connection with Java
Try using Redis with Java (jar)
I tried UDP communication with Java
Let's try WebSocket with Java and javascript!
Memo when HTTP communication with Java (OkHttp)
Try managing Java libraries with AWS CodeArtifact
Try using the Wii remote with Java
Try Java 8 Stream
Roughly try Java 9
Try to link Ruby and Java with Dapr
Try to implement TCP / IP + NIO with JAVA
Try debugging a Java program with VS Code
Try DI with Micronaut
Change seats with java
Install Java with Ansible
Comfortable download with JAVA
Try WildFly with Docker
Java Network Basics (Communication)
Java bidirectional map library
Download Java with Ansible
Try connecting to AzureCosmosDB Emulator for Docker with Java
Run Mosquitto with Docker and try WebSocket communication with MQTT
Let's scrape with Java! !!
Build Java with Wercker
Try building Java into a native module with GraalVM
Try Java return value
Endian conversion with JAVA
[Beginner] Try to make a simple RPG game with Java ①
Try developing a containerized Java web application with Eclipse + Codewind
Socket communication with a web browser using Java and JavaScript ②
Socket communication with a web browser using Java and JavaScript ①
Easy BDD with (Java) Spectrum?
Use Lambda Layers with Java
Java multi-project creation with Gradle
Getting Started with Java Collection
Try using RocksDB in Java
Java Config with Spring MVC
Basic Authentication with Java 11 HttpClient
Let's experiment with Java inlining
Run batch with docker-compose with Java batch
[Template] MySQL connection with Java
Rewrite Java try-catch with Optional
Install Java 7 with Homebrew (cask)
Try using GloVe with Deeplearning4j
Java to play with Function
Try using view_component with rails
Try calling JavaScript in Java
Try developing Spresense in Java (1)
Try functional type in Java! ①
[Java] JavaConfig with Static InnerClass
Let's operate Excel with Java! !!
Version control Java with SDKMAN
RSA encryption / decryption with java 8
Paging PDF with Java + PDFBox.jar
Sort strings functionally with java
Object-oriented (java) with Strike Gundam
Java version control with jenv