An overview of gRPC can be found in the entry here (https://qiita.com/disc99/items/cfca50a32240284578bb).
In this entry, I will focus on the points of concern in gRPC operation and the case of implementing in Java.
In modern systems, communication between systems is increasing through APIs. In this case, the interface of each system is often developed in one of the following patterns.
Pattern 1: Manually describe the interface document (specifications, etc.), and implement the server and client developers to meet the requirements. Pattern 2: Automatically generate interface documentation from server-side code and implement the client to meet its requirements Pattern 3: Manually write interface documentation to automatically generate server and client code for each system
In development using gRPC, the above 3 patterns are used. This method predefines the interface document and automatically generates an implementation for it.
--No difference from the document --Easy to get interface agreement in advance --Can reduce implementation costs
There are merits such as.
This is the simplest gRPC implementation in Java.
build.gradle
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8'
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'com.google.protobuf'
repositories {
jcenter()
}
def grpcVersion = '1.21.0'
dependencies {
compile "io.grpc:grpc-netty:${grpcVersion}"
compile "io.grpc:grpc-protobuf:${grpcVersion}"
compile "io.grpc:grpc-stub:${grpcVersion}"
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.8.0"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.21.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
// for IDEA
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
src/main/proto/helloworld.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
Code generation to use from gradle
> ./gradlew generateProto
Implement the process defined in .proto
in the class that inherits * Grpc. * ImplBase
, and register it with ʻaddService of
ServerBuilder`.
DemoServer.java
class DemoServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(6565)
.addService(new GreeterImpl())
.build();
server.start();
server.awaitTermination();
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
ManagedChannelBuilder
creates a Channel for the server and uses the automatically generated Stub class to execute the request.
DemoClient.java
class DemoClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.build();
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
HelloRequest request = HelloRequest.newBuilder()
.setName("Tom")
.build();
HelloReply reply = stub.sayHello(request);
System.out.println("Reply: " + reply);
}
}
When you start DemoServer and execute DemoClient, the following output will be output.
output
> Reply: message: "Hello Tom"
For Spring Boot, there is a starter, which is convenient to use.
build.gradle
//...
dependencies {
// compile "io.grpc:grpc-netty:${grpcVersion}"
// compile "io.grpc:grpc-protobuf:${grpcVersion}"
// compile "io.grpc:grpc-stub:${grpcVersion}"
compile('org.lognet:grpc-spring-boot-starter:${grpcStarterVersion}')
}
//...
Add @GRpcService
to the class that inherits * Grpc. * ImplBase
and implement the process defined in .proto
.
DemoServerApplication.java
@SpringBootApplication
class DemoServerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoServerApplication.class, args);
}
@GRpcService
public static class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
With gRPC, you can use two-way communication using HTTP / 2 instead of the conventional single communication.
Communication method | image | Overview | Use |
---|---|---|---|
Unary RPC | A method that returns one response for one request | General API, inter-application communication | |
Server streaming RPC | A method that returns multiple responses to one request | When pushing multiple data from the server such as server-side push, timeline, feed delivery, etc. | |
Client streaming RPC | A method that returns one response to multiple requests | Large amount of data upload etc. | |
Bidirectional streaming RPC | A method that returns multiple responses to multiple requests | For two-way communication such as chat |
The following is an implementation example of each.
src/main/proto/helloworld.proto
// ...
service Greeter {
rpc SayHelloUnary (HelloRequest) returns (HelloReply) {}
rpc SayHelloServerStreaming (HelloRequest) returns (stream HelloReply) {}
rpc SayHelloClientStreaming (stream HelloRequest) returns (HelloReply) {}
rpc SayHelloBidirectionalStreaming (stream HelloRequest) returns (stream HelloReply) {}
}
// ...
DemoServer.java
class DemoServer {
public static void main(String[] args) throws Exception {
// ...
}
public static class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHelloUnary(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void sayHelloServerStreaming(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public StreamObserver<HelloRequest> sayHelloClientStreaming(StreamObserver<HelloReply> responseObserver) {
List<String> requests = new ArrayList<>();
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest request) {
requests.add(request.getName());
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + requests.toString()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<HelloRequest> sayHelloBidirectionalStreaming(StreamObserver<HelloReply> responseObserver) {
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest request) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onNext(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}
DemoClient.java
class DemoClient {
public static void main(String[] args) {
// ...
}
String unary() {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
return blockingStub().sayHelloUnary(request).toString(); // message: "Hello Tom"
}
String serverStreaming() {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
Iterator<HelloReply> replies = blockingStub().sayHelloServerStreaming(request);
List<HelloReply> response = new ArrayList<>();
while (replies.hasNext()) {
response.add(replies.next());
}
return response.toString(); // [message: "Hello Tom", message: "Hello Tom", message: "Hello Tom"]
}
String clientStreaming() throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
CountDownLatch finishLatch = new CountDownLatch(1);
List<HelloReply> response = new ArrayList<>();
StreamObserver<HelloRequest> streamObserver = stub().sayHelloClientStreaming(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
response.add(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
});
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onCompleted();
finishLatch.await(10, TimeUnit.SECONDS);
return response.toString(); // message: "Hello [Tom, Tom, Tom]"
}
String bidirectionalStreaming() throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
CountDownLatch finishLatch = new CountDownLatch(1);
List<HelloReply> response = new ArrayList<>();
StreamObserver<HelloRequest> streamObserver = stub().sayHelloBidirectionalStreaming(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
response.add(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
});
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onCompleted();
finishLatch.await(10, TimeUnit.SECONDS);
return response.toString(); // [message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" ]
}
private GreeterGrpc.GreeterBlockingStub blockingStub() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build();
return GreeterGrpc.newBlockingStub(channel);
}
private GreeterGrpc.GreeterStub stub() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build();
return GreeterGrpc.newStub(channel);
}
}
Since it uses the Observer pattern, it may be a little confusing if you are not familiar with it. Also, this kind of interface is also adopted in RxJava and Reactor, so to integrate with each Reactive-grpc etc. also exists.
Since gRPC communication enables bidirectional communication, it is a permanent connection using a TCP connection. While this is more efficient than HTTP / 1.1, which connects on a per-communication basis, it requires proper load balancing. The following are the methods and features.
type | Description | merit | デmerit |
---|---|---|---|
Proxy | Simple client-side implementation Also accessible to untrusted clients |
High latency Depends on throughput by LB |
|
Client Side | High performance because there is no middle layer | Client implementation is complicated, and it is necessary to prepare a mechanism for health check and load distribution. Must be implemented for each language Need a mechanism to ensure client reliability |
You can achieve client load balancing by using Eureka. There are several implementation methods, but it is convenient to use grpc-spring-boot-starter together with the Service implementation.
On the server side, add dependencies, set the application name, port to use, eureka server settings, and enable Eureka in the same way as when using normal Eureka.
build.gradle
//...
dependencies {
// ...
compile('org.springframework.cloud:spring-cloud-starter-eureka')
}
//...
bootstrap.yml
spring:
application:
name: demo-server //Application name setting
application.yml
grpc:
port: 6565 //gRPC port settings
eureka:
instance:
nonSecurePort: ${grpc.port} //Set to gRPC port and Eureka
DemoServerApplication.java
@SpringBootApplication
@EnableEurekaClient //Enable Eureka
class DemoServerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoServerApplication.class, args);
}
// ...
}
The client implementation is simple, add dependencies, enable Eureka, and use EurekaClient to get the IP and port of the server. Load balancing is done on the Eureka side, so no special implementation is required on the client.
build.gradle
//...
dependencies {
// ...
compile('org.springframework.cloud:spring-cloud-starter-eureka')
}
//...
DemoClientApplication.java
@SpringBootApplication
@EnableEurekaClient //Enable Eureka
class DemoClientApplication {
@Autowired
EurekaClient client;
void sayHello(HelloRequest request) {
InstanceInfo instanceInfo = client.getNextServerFromEureka("backend-service", false); //Get server information from Eureka Client
ManagedChannel channel = ManagedChannelBuilder.forAddress(instanceInfo.getIPAddr(), instanceInfo.getPort()) //Set IP and port from Instance Info
.usePlaintext(true)
.build();
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
stub.sayHello(request);
}
// ...
}
The basic error uses the StreamObserver
ʻonError` method.
class DemoServer {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
try {
// ...
} catch (Exception e) {
StatusRuntimeException exception = Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException();
responseObserver.onError(exception);
}
}
}
DemoClient.java
class DemoClient {
public static void main(String[] args) {
try {
// ...
} catch (StatusRuntimeException e) {
e.printStackTrace(); // io.grpc.StatusRuntimeException: INTERNAL: error message...
}
}
}
In production, you often want to handle more detailed information about errors.
In that case, use Metadata
.
Various information can be added to Metadata
, but it is convenient to use a proto file.
message Error {
string message = 1;
string detail = 2;
}
class DemoServer {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
try {
// ...
} catch (Exception e) {
Metadata metadata = new Metadata(); //Generate Metadata
Error error = Error.newBuilder()
.setMessage("my error")
.setDetail("error detail")
.build();
Metadata.Key<Error> key = ProtoUtils.keyForProto(error);
metadata.put(key, error); //Added error information
StatusRuntimeException exception = Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException(metadata); //Add Metadata
responseObserver.onError(exception);
}
}
}
DemoClient.java
class DemoClient {
public static void main(String[] args) {
try {
// ...
} catch (StatusRuntimeException e) {
e.printStackTrace();
Status status = Status.fromThrowable(e); // Status{code=INTERNAL, description=Invalid parameter, cause=null}
Metadata metadata = Status.trailersFromThrowable(e); // Metadata(content-type=application/grpc,helloworld.error-bin=CghteSBlcnJvchIMZXJyb3IgZGV0YWls)
Error error = metadata.get(ProtoUtils.keyForProto(Error.getDefaultInstance())); // error=message: "my error detail: "error detail"
}
}
}
Error due to onError If an exception occurs on the server without handling, a StatusRuntimeException (Status = Unknown) will occur on the client side.
It can detect that an exception has occurred on the server side, but it is unknown what kind of exception is occurring.
A try-catch is required on each server-side endpoint, as a normal application can always throw a RuntimeException.
However, such processing becomes very redundant, so it is the Interceptor that transparently embeds the processing.
You can implement your own error handling by implementing ʻio.grpc.ServerInterceptor, but it is convenient to use the standard ʻio.grpc.util.TransmitStatusRuntimeExceptionInterceptor
if you are happy with general error handling.
DemoServer.java
class DemoServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(6565)
.intercept(TransmitStatusRuntimeExceptionInterceptor.instance()) //Interceptor registration
.addService(new GreeterImpl())
.build();
// ...
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
//Metadata generation etc....
//Throw StatusRuntimeException instead of onError
throw Status.INTERNAL
.withDescription("server error")
.asRuntimeException(metadata);
}
}
}
If the server fails for some reason, you can retry on the client side.
DemoClient.java
class DemoClient {
public static void main(String[] args) {
//Retry policy settings
Map<String, Object> retryPolicy = new HashMap<>();
retryPolicy.put("maxAttempts", 3D);
retryPolicy.put("initialBackoff", "0.5s");
retryPolicy.put("maxBackoff", "1s");
retryPolicy.put("backoffMultiplier", 2D);
retryPolicy.put("retryableStatusCodes", Arrays.asList("UNAVAILABLE"));
Map<String, Object> methodConfig = new HashMap<>();
Map<String, Object> name = new HashMap<>();
name.put("service", "helloworld.Greeter");
methodConfig.put("name", Collections.singletonList(name));
methodConfig.put("retryPolicy", retryPolicy);
Map<String, Object> serviceConfig = new HashMap<>();
serviceConfig.put("methodConfig", Collections.singletonList(methodConfig));
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.enableRetry() //Enable retry
.defaultServiceConfig(serviceConfig) //Apply settings
.build();
// ...
//If UNAVAILABLE occurs, it will automatically retry according to the retry policy.
HelloReply reply = stub.sayHello(request);
// ...
}
}
There is no dedicated gRPC validation specification, so treat it like a normal error.
There is also a Status.INVALID_ARGUMENT
for incorrect input.
gRPC provides an authentication mechanism that uses SSL / TLS and Google tokens.
You may want a feature like PreAuthorize
in Spring Security.
Currently, such a function is not officially supported, but it can be implemented by using an interceptor.
The following entries will be helpful.
DemoGrpcService.java
@GRpcService
public class DemoGrpcService extends DemoServiceGrpc.DemoServiceImplBase {
@Override
@PreAuthorize("hasRole('USER')")
public void list(ListRequest request, StreamObserver<ListResponse> responseObserver) {
// ...
}
@Override
@PreAuthorize("hasRole('ADMIN')")
public void buy(BuyRequest request, StreamObserver<BuyResponse> responseObserver) {
// ...
}
}
It is also considered in grpc-spring-boot-starter issues.
However, as the current use of gRPC, it is often used for communication within the platform rather than in a widely open environment, so it is better to consider how much control is required.
If you want to test gRPC, use the officially provided library.
build.gradle
//...
dependencies {
// ...
testCompile "io.grpc:grpc-testing:${grpcVersion}"
}
//...
DemoServerTest.java
class DemoServerTest {
//Use the Rule class provided by the gRPC library
@Rule
public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Test
public void test() {
grpcServerRule.getServiceRegistry().addService(new GreeterService());
GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(grpcServerRule.getChannel());
String testName = "test name";
HelloReply reply = blockingStub.sayHello(HelloRequest.newBuilder().setName(testName).build());
assertEquals("Hello " + testName, reply.getMessage());
}
}
DemoClientTest.java
class DemoClientTest {
//Use the Rule class provided by the gRPC library
@Rule
public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Test
public void test() {
GreeterGrpc.GreeterImplBase serviceImpl = Mockito.spy(new GreeterGrpc.GreeterImplBase() {}); //Mocking Server implementation
grpcServerRule.getServiceRegistry().addService(serviceImpl);
ArgumentCaptor<HelloRequest> requestCaptor = ArgumentCaptor.forClass(HelloRequest.class);
String testName = "test name";
DemoClient client = new DemoClient(grpcServerRule.getChannel());;
client.hello(testName);
Mockito.verify(serviceImpl)
.sayHello(requestCaptor.capture(), Matchers.any()); //Capture execution
assertEquals(testName, requestCaptor.getValue().getName());
}
}
To use Protocol Buffers with gRPC, you need to manage .proto files. There are no officially supported features for this method, so here are two typical ones.
The method used by Mercari etc. is to create a dedicated repository for .proto. (Proto File Management) In this case, it will be easier to generate a library for each client language by linking with CI. You can also use Git's submodule etc. to import the .proto file on the client side without doing so.
When managing with a dedicated repository, the server side code and the .proto file repository are separated, but I think that it is often easier to handle by including .proto on the server side where implementation is performed. In that case, you can use protodep to aggregate the .proto files in each repository on the client side. This makes it possible to manage .proto files in the application on the server side and use the .proto files required on the client side.
gRPC-Web To use gRPC from your browser, there is gRPC-Web. At the moment, nginx and envoy are used as proxies to realize communication between the browser and the gRPC server.
REST API In many cases, you want to access an application built with gRPC via the REST API. In this case, grpc-gateway is convenient. This is going to be a little long, so I will summarize it as another entry.
gRPC is a very effective mechanism for building a system via multiple applications, but I think there are many unclear points in actual operation, so I summarized it. Also, I will update this entry as I gain more knowledge.
Recommended Posts