I often see gRPC these days, but what about two-way communication, which is one of the advantages? I couldn't find a lot of literature on Java usage, so I'll summarize it.
--I'm going to describe what to do with bidirectional communication while maintaining a TCP connection when using gRPC with Java. --I will leave the explanation of gRPC settings and Protocol Buffer to related materials.
Execution environment, etc. | version |
---|---|
JDK | 1.8.0_121 |
Kotlin | 1.1.2 |
gRPC | 1.5.0 |
Rough features.
--RPC framework developed by Google (Official tutorials using Java here) --Unified calling between multiple languages is possible using Protocol Buffer --High-speed, two-way communication is possible using HTTP / 2
For the time being, the point is ** add a stram to the proto file **
Two-way communication is possible by properly processing StreamObservable
in the method associated with the completed stub class.
First, let's look at an example of a common 1 request and 1 response.
Create a simple service that takes the string message
and returns the inverted string result
image
+----------+ +---------------+
| |--- Request -->| |
| Client | | Server |
| |<-- Response ---| |
+----------+ +---------------+
syntax = "proto3";
message StringRequest {
string message = 1;
}
message ReverseReply {
string result = 1;
}
service ReverseString {
rpc Exec (StringRequest) returns (ReverseReply) {}
}
You don't have to do anything here, I'll show you how the code automatically generated by Protocol Buffer looks like this:
public static abstract class ReverseStringImplBase implements io.grpc.BindableService {
public void exec(StringRequest request,
io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
}
...
}
This is a code excerpt on the Server side. Here it is assumed to boot on gRPC default port 6565 The server side describes the response using StreamObserver, but it seems that the call of ʻonNext` is allowed only once. (I tried calling it twice, but the process was blocked and it didn't work)
class ReverseStringService : ReverseStringGrpc.ReverseStringImplBase() {
override fun exec(request: StringRequest?, responseObserver: StreamObserver<ReverseReply>?) {
println("req : " + request?.message);
val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()
responseObserver?.onNext(reply)
responseObserver?.onCompleted()
}
}
Client implementation of asynchronous processing to match the following description
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build()
val stub = ReverseStringGrpc.newStub(channel)
val message = StringRequest.newBuilder().setMessage("morimori").build()
stub.exec(message, object : StreamObserver<ReverseReply> {
override fun onNext(reply: ReverseReply) {
println("res : " + reply.result)
}
override fun onError(t: Throwable) {}
override fun onCompleted() {
println("complete")
}
})
Server
req : morimori
Client
res : iromirom
complete
I want to be able to exchange Request / Response multiple times with the same TCP connection
image
+----------+ +---------------+
| |--- Request -->| |
| | | |
| |<-- Response ---| |
| | | |
| |--- Request -->| |
| Client | | Server |
| |<-- Response ---| |
| | | |
| |--- Request -->| |
| | | |
| |<-- Response ---| |
+----------+ +---------------+
Added ʻExecStream`
syntax = "proto3";
...
service ReverseString {
rpc Exec (StringRequest) returns (ReverseReply) {}
rpc ExecStream (stream StringRequest) returns (stream ReverseReply) {}
}
A streamExec method with both arguments and return value StreamObject
is added
public static abstract class ReverseStringImplBase implements io.grpc.BindableService {
public void exec(StringRequest request,
io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
}
public io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.StringRequest> execStream(
io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.ReverseReply> responseObserver) {
return asyncUnimplementedStreamingCall(METHOD_EXEC_STREAM, responseObserver);
}
...
}
Write code that returns a StreamObservable To make it easier to understand the difference in execution results, I will return 2 Responses for 1 Request.
override fun execStream(responseObserver: StreamObserver<ReverseReply>?): StreamObserver<StringRequest> {
return object : StreamObserver<StringRequest> {
override fun onNext(request: StringRequest) {
println("req : " + request?.message);
val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()
responseObserver?.onNext(reply)
responseObserver?.onNext(reply)
}
override fun onError(t: Throwable) {}
override fun onCompleted() {
println("complete")
responseObserver?.onCompleted()
}
}
}
The implementation on the Client side has changed, and the stub type has been changed to use StreamObserver # onNext
to make a request.
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build()
val message_1 = StringRequest.newBuilder().setMessage("morimori1").build()
val message_2 = StringRequest.newBuilder().setMessage("morimori2").build()
val message_3 = StringRequest.newBuilder().setMessage("morimori3").build()
val stub = ReverseStringGrpc.newStub(channel)
val observer = stub.execStream(object : StreamObserver<ReverseReply> {
override fun onNext(reply: ReverseReply) {
println("res : " + reply.result)
}
override fun onError(t: Throwable?) {}
override fun onCompleted() {
println("complete")
}
})
observer.onNext(message_1)
observer.onNext(message_2)
observer.onNext(message_3)
observer.onComplete()
Server
req : morimori1
req : morimori2
req : morimori3
complete
Client
res : 1iromirom
res : 1iromirom
res : 2iromirom
res : 2iromirom
res : 3iromirom
res : 3iromirom
complete
As you can see from the sample results, the following biased communication is also possible.
Request:1、Response:N
+----------+ +---------------+
| |--- Request -->| |
| | | |
| | | |
| Client |<-- Response ---| Server |
| |<-- Response ---| |
| |<-- Response ---| |
| | | |
+----------+ +---------------+
Request:N、Response:1
+----------+ +---------------+
| |--- Request -->| |
| |--- Request -->| |
| |--- Request -->| |
| Client | | Server |
| | | |
| |<-- Response ---| |
| | | |
+----------+ +---------------+
Of course, I think it is possible to do things like Request: N, Response: N.
For two-way communication, the corresponding stub class was generated by adding stream
to the ProtocolBuffer definition.
Regarding the implementation, I felt that if I was aware of continuous event processing using StreamObservable, which is similar to RxJava Observable, I wouldn't be confused so much.
The impression is that it is fairly easy to write high-performance bidirectional and continuous RPC. Since gRPC can also be used with mobile apps, I wonder if incremental search can be realized efficiently.
-Setting memo when setting up a gRPC server with Spring Boot in Kotlin -Organization of RPC method in gRPC
Recommended Posts