Un aperçu de gRPC peut être trouvé dans l'entrée ici (https://qiita.com/disc99/items/cfca50a32240284578bb).
Dans cette entrée, je me concentrerai sur les points de préoccupation lors de l'utilisation de gRPC et sur le cas de sa mise en œuvre en Java.
Dans les systèmes modernes, la communication entre les systèmes augmente grâce aux API. Dans ce cas, l'interface de chaque système est souvent développée selon l'un des modèles suivants.
Modèle 1: Décrivez manuellement les documents d'interface (spécifications, etc.) et implémentez les développeurs serveur et client pour répondre aux exigences. Modèle 2: générer automatiquement la documentation de l'interface à partir du code côté serveur et implémenter le client pour répondre à ses exigences Modèle 3: rédiger manuellement la documentation de l'interface pour générer automatiquement le code serveur et client pour chaque système
Dans le développement utilisant gRPC, les 3 modèles ci-dessus sont utilisés. Cette méthode prédéfinit le document d'interface et génère automatiquement une implémentation pour celui-ci.
Il y a des mérites tels que.
C'est le cas lors de l'implémentation du gRPC le plus simple en Java.
build.gradle
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8'
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'com.google.protobuf'
repositories {
jcenter()
}
def grpcVersion = '1.21.0'
dependencies {
compile "io.grpc:grpc-netty:${grpcVersion}"
compile "io.grpc:grpc-protobuf:${grpcVersion}"
compile "io.grpc:grpc-stub:${grpcVersion}"
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.8.0"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.21.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
// for IDEA
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
src/main/proto/helloworld.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
Génération de code à utiliser à partir de gradle
> ./gradlew generateProto
Dans la classe qui hérite de * Grpc. * ImplBase
, implémentez le processus défini dans .proto
et enregistrez-le avec ʻaddService de
ServerBuilder`.
DemoServer.java
class DemoServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(6565)
.addService(new GreeterImpl())
.build();
server.start();
server.awaitTermination();
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
ManagedChannelBuilder
crée un canal pour le serveur et utilise la classe Stub générée automatiquement pour exécuter la requête.
DemoClient.java
class DemoClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.build();
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
HelloRequest request = HelloRequest.newBuilder()
.setName("Tom")
.build();
HelloReply reply = stub.sayHello(request);
System.out.println("Reply: " + reply);
}
}
Lorsque vous démarrez DemoServer et exécutez DemoClient, la sortie suivante sera sortie.
output
> Reply: message: "Hello Tom"
Pour Spring Boot, il existe un starter, qui est pratique à utiliser.
build.gradle
//...
dependencies {
// compile "io.grpc:grpc-netty:${grpcVersion}"
// compile "io.grpc:grpc-protobuf:${grpcVersion}"
// compile "io.grpc:grpc-stub:${grpcVersion}"
compile('org.lognet:grpc-spring-boot-starter:${grpcStarterVersion}')
}
//...
Ajoutez @ GRpcService
à la classe qui hérite de * Grpc. * ImplBase
et implémentez le processus défini dans .proto
.
DemoServerApplication.java
@SpringBootApplication
class DemoServerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoServerApplication.class, args);
}
@GRpcService
public static class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
Avec gRPC, vous pouvez utiliser une communication bidirectionnelle utilisant HTTP / 2 au lieu de la communication simple conventionnelle.
Méthode de communication | image | Aperçu | Utilisation |
---|---|---|---|
Unary RPC | Une méthode qui renvoie une réponse pour une demande | API générale, communication entre applications | |
Server streaming RPC | Une méthode qui renvoie plusieurs réponses à une demande | Lors de la transmission de plusieurs données depuis le serveur, telles que la poussée côté serveur, la chronologie, la livraison de flux, etc. | |
Client streaming RPC | Une méthode qui renvoie une réponse à plusieurs demandes | Importante quantité de données téléchargées, etc. | |
Bidirectional streaming RPC | Une méthode qui renvoie plusieurs réponses à plusieurs demandes | Pour une communication bidirectionnelle telle que le chat |
Voici un exemple de chaque implémentation.
src/main/proto/helloworld.proto
// ...
service Greeter {
rpc SayHelloUnary (HelloRequest) returns (HelloReply) {}
rpc SayHelloServerStreaming (HelloRequest) returns (stream HelloReply) {}
rpc SayHelloClientStreaming (stream HelloRequest) returns (HelloReply) {}
rpc SayHelloBidirectionalStreaming (stream HelloRequest) returns (stream HelloReply) {}
}
// ...
DemoServer.java
class DemoServer {
public static void main(String[] args) throws Exception {
// ...
}
public static class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHelloUnary(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void sayHelloServerStreaming(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public StreamObserver<HelloRequest> sayHelloClientStreaming(StreamObserver<HelloReply> responseObserver) {
List<String> requests = new ArrayList<>();
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest request) {
requests.add(request.getName());
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + requests.toString()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<HelloRequest> sayHelloBidirectionalStreaming(StreamObserver<HelloReply> responseObserver) {
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest request) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onNext(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}
DemoClient.java
class DemoClient {
public static void main(String[] args) {
// ...
}
String unary() {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
return blockingStub().sayHelloUnary(request).toString(); // message: "Hello Tom"
}
String serverStreaming() {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
Iterator<HelloReply> replies = blockingStub().sayHelloServerStreaming(request);
List<HelloReply> response = new ArrayList<>();
while (replies.hasNext()) {
response.add(replies.next());
}
return response.toString(); // [message: "Hello Tom", message: "Hello Tom", message: "Hello Tom"]
}
String clientStreaming() throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
CountDownLatch finishLatch = new CountDownLatch(1);
List<HelloReply> response = new ArrayList<>();
StreamObserver<HelloRequest> streamObserver = stub().sayHelloClientStreaming(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
response.add(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
});
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onCompleted();
finishLatch.await(10, TimeUnit.SECONDS);
return response.toString(); // message: "Hello [Tom, Tom, Tom]"
}
String bidirectionalStreaming() throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
CountDownLatch finishLatch = new CountDownLatch(1);
List<HelloReply> response = new ArrayList<>();
StreamObserver<HelloRequest> streamObserver = stub().sayHelloBidirectionalStreaming(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
response.add(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
});
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onCompleted();
finishLatch.await(10, TimeUnit.SECONDS);
return response.toString(); // [message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" ]
}
private GreeterGrpc.GreeterBlockingStub blockingStub() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build();
return GreeterGrpc.newBlockingStub(channel);
}
private GreeterGrpc.GreeterStub stub() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build();
return GreeterGrpc.newStub(channel);
}
}
Comme il utilise le modèle Observer, cela peut être un peu déroutant si vous ne le connaissez pas. Aussi, ce type d'interface est également adopté dans RxJava et Reactor, afin de s'intégrer à chaque Reactive-grpc etc. existe également.
Étant donné que la communication gRPC permet une communication bidirectionnelle, il s'agit d'une connexion permanente utilisant une connexion TCP. C'est plus efficace que HTTP / 1.1, qui se connecte sur une base de communication, mais nécessite un équilibrage de charge approprié. Voici les méthodes et fonctionnalités.
type | La description | mérite | デmérite |
---|---|---|---|
Proxy | Implémentation simple côté client Également accessible aux clients peu fiables |
Latence élevée Dépend du débit par LB |
|
Client Side | Haute performance car il n'y a pas de couche intermédiaire | L'implémentation du client est compliquée et il est nécessaire de préparer un mécanisme de vérification de l'état et de répartition de la charge. Doit être implémenté pour chaque langue Un mécanisme pour assurer la fiabilité du client est nécessaire |
Vous pouvez réaliser un équilibrage de charge client à l'aide d'Eureka. Il existe plusieurs façons de l'implémenter, mais il est pratique d'utiliser grpc-spring-boot-starter avec l'implémentation de Service.
Du côté du serveur, comme dans le cas de l'utilisation d'Eureka normale, ajoutez des dépendances, définissez le nom de l'application, le port à utiliser, les paramètres du serveur eureka et activez Eureka.
build.gradle
//...
dependencies {
// ...
compile('org.springframework.cloud:spring-cloud-starter-eureka')
}
//...
bootstrap.yml
spring:
application:
name: demo-server //Réglage du nom de l'application
application.yml
grpc:
port: 6565 //Paramètres du port gRPC
eureka:
instance:
nonSecurePort: ${grpc.port} //défini sur le port gRPC et Eureka
DemoServerApplication.java
@SpringBootApplication
@EnableEurekaClient //Activer Eureka
class DemoServerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoServerApplication.class, args);
}
// ...
}
L'implémentation du client est simple, ajoutez des dépendances, activez Eureka et utilisez EurekaClient pour obtenir l'adresse IP et le port du serveur. L'équilibrage de charge est effectué côté Eureka, aucune implémentation spéciale n'est donc requise sur le client.
build.gradle
//...
dependencies {
// ...
compile('org.springframework.cloud:spring-cloud-starter-eureka')
}
//...
DemoClientApplication.java
@SpringBootApplication
@EnableEurekaClient //Activer Eureka
class DemoClientApplication {
@Autowired
EurekaClient client;
void sayHello(HelloRequest request) {
InstanceInfo instanceInfo = client.getNextServerFromEureka("backend-service", false); //Obtenir des informations sur le serveur depuis Eureka Client
ManagedChannel channel = ManagedChannelBuilder.forAddress(instanceInfo.getIPAddr(), instanceInfo.getPort()) //Définir l'adresse IP et le port à partir des informations d'instance
.usePlaintext(true)
.build();
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
stub.sayHello(request);
}
// ...
}
L'erreur de base utilise la méthode ʻonError
StreamObserver`.
class DemoServer {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
try {
// ...
} catch (Exception e) {
StatusRuntimeException exception = Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException();
responseObserver.onError(exception);
}
}
}
DemoClient.java
class DemoClient {
public static void main(String[] args) {
try {
// ...
} catch (StatusRuntimeException e) {
e.printStackTrace(); // io.grpc.StatusRuntimeException: INTERNAL: error message...
}
}
}
En production, vous souhaitez souvent traiter des informations plus détaillées sur les erreurs.
Dans ce cas, utilisez Metadata
.
Diverses informations peuvent être ajoutées aux «Métadonnées», mais il est pratique d'utiliser un fichier proto.
message Error {
string message = 1;
string detail = 2;
}
class DemoServer {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
try {
// ...
} catch (Exception e) {
Metadata metadata = new Metadata(); //Générer des métadonnées
Error error = Error.newBuilder()
.setMessage("my error")
.setDetail("error detail")
.build();
Metadata.Key<Error> key = ProtoUtils.keyForProto(error);
metadata.put(key, error); //Ajout d'informations d'erreur
StatusRuntimeException exception = Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException(metadata); //Ajouter des métadonnées
responseObserver.onError(exception);
}
}
}
DemoClient.java
class DemoClient {
public static void main(String[] args) {
try {
// ...
} catch (StatusRuntimeException e) {
e.printStackTrace();
Status status = Status.fromThrowable(e); // Status{code=INTERNAL, description=Invalid parameter, cause=null}
Metadata metadata = Status.trailersFromThrowable(e); // Metadata(content-type=application/grpc,helloworld.error-bin=CghteSBlcnJvchIMZXJyb3IgZGV0YWls)
Error error = metadata.get(ProtoUtils.keyForProto(Error.getDefaultInstance())); // error=message: "my error detail: "error detail"
}
}
}
Si une exception se produit sur le serveur sans gestion des erreurs dues à onError, une exception StatusRuntimeException (Status = Unknown) se produira du côté client.
Il peut détecter qu'une exception s'est produite côté serveur, mais on ne sait pas quel type d'exception s'est produit.
Un try-catch est requis sur chaque point de terminaison côté serveur, car les applications normales peuvent toujours lever une RuntimeException.
Cependant, un tel traitement devient très redondant, c'est donc l'Interceptor qui intègre de manière transparente le traitement.
Vous pouvez implémenter votre propre gestion des erreurs en implémentant ʻio.grpc.ServerInterceptor, mais il est pratique d'utiliser le standard ʻio.grpc.util.TransmitStatusRuntimeExceptionInterceptor
si la gestion générale des erreurs est acceptable.
DemoServer.java
class DemoServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(6565)
.intercept(TransmitStatusRuntimeExceptionInterceptor.instance()) //Enregistrement des intercepteurs
.addService(new GreeterImpl())
.build();
// ...
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
//Générez des métadonnées, etc....
//Lancer une exception StatusRuntimeException au lieu de onError
throw Status.INTERNAL
.withDescription("server error")
.asRuntimeException(metadata);
}
}
}
Si le serveur échoue pour une raison quelconque, vous pouvez réessayer du côté client.
DemoClient.java
class DemoClient {
public static void main(String[] args) {
//Réessayer les paramètres de stratégie
Map<String, Object> retryPolicy = new HashMap<>();
retryPolicy.put("maxAttempts", 3D);
retryPolicy.put("initialBackoff", "0.5s");
retryPolicy.put("maxBackoff", "1s");
retryPolicy.put("backoffMultiplier", 2D);
retryPolicy.put("retryableStatusCodes", Arrays.asList("UNAVAILABLE"));
Map<String, Object> methodConfig = new HashMap<>();
Map<String, Object> name = new HashMap<>();
name.put("service", "helloworld.Greeter");
methodConfig.put("name", Collections.singletonList(name));
methodConfig.put("retryPolicy", retryPolicy);
Map<String, Object> serviceConfig = new HashMap<>();
serviceConfig.put("methodConfig", Collections.singletonList(methodConfig));
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.enableRetry() //Activer la nouvelle tentative
.defaultServiceConfig(serviceConfig) //Appliquer les paramètres
.build();
// ...
//Si NON DISPONIBLE se produit, il réessayera automatiquement conformément à la stratégie de nouvelle tentative.
HelloReply reply = stub.sayHello(request);
// ...
}
}
Il n'y a pas de spécification spécifique pour la validation gRPC, alors traitez-le comme une erreur normale.
Il existe également un Status.INVALID_ARGUMENT
pour une entrée incorrecte.
gRPC fournit un mécanisme d'authentification à l'aide de SSL / TLS et d'authentification à l'aide de jetons Google.
Vous voudrez peut-être une fonctionnalité comme PreAuthorize
dans Spring Security.
Actuellement, une telle fonction n'est pas officiellement prise en charge, mais elle peut être implémentée à l'aide d'un intercepteur.
Les entrées suivantes seront utiles.
DemoGrpcService.java
@GRpcService
public class DemoGrpcService extends DemoServiceGrpc.DemoServiceImplBase {
@Override
@PreAuthorize("hasRole('USER')")
public void list(ListRequest request, StreamObserver<ListResponse> responseObserver) {
// ...
}
@Override
@PreAuthorize("hasRole('ADMIN')")
public void buy(BuyRequest request, StreamObserver<BuyResponse> responseObserver) {
// ...
}
}
Il est également pris en compte dans grpc-spring-boot-starter issues.
Cependant, en tant qu'utilisation actuelle de gRPC, il est souvent utilisé pour la communication au sein de la plate-forme plutôt que dans un environnement largement ouvert, il est donc préférable de prendre en compte le niveau de contrôle requis.
Lors du test de gRPC, utilisez la bibliothèque officiellement fournie.
build.gradle
//...
dependencies {
// ...
testCompile "io.grpc:grpc-testing:${grpcVersion}"
}
//...
DemoServerTest.java
class DemoServerTest {
//Utilisez la classe Rule fournie par la bibliothèque gRPC
@Rule
public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Test
public void test() {
grpcServerRule.getServiceRegistry().addService(new GreeterService());
GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(grpcServerRule.getChannel());
String testName = "test name";
HelloReply reply = blockingStub.sayHello(HelloRequest.newBuilder().setName(testName).build());
assertEquals("Hello " + testName, reply.getMessage());
}
}
DemoClientTest.java
class DemoClientTest {
//Utilisez la classe Rule fournie par la bibliothèque gRPC
@Rule
public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Test
public void test() {
GreeterGrpc.GreeterImplBase serviceImpl = Mockito.spy(new GreeterGrpc.GreeterImplBase() {}); //Se moquer de l'implémentation du serveur
grpcServerRule.getServiceRegistry().addService(serviceImpl);
ArgumentCaptor<HelloRequest> requestCaptor = ArgumentCaptor.forClass(HelloRequest.class);
String testName = "test name";
DemoClient client = new DemoClient(grpcServerRule.getChannel());;
client.hello(testName);
Mockito.verify(serviceImpl)
.sayHello(requestCaptor.capture(), Matchers.any()); //Exécution de la capture
assertEquals(testName, requestCaptor.getValue().getName());
}
}
Pour utiliser les tampons de protocole avec gRPC, vous devez gérer les fichiers .proto. Il n'y a pas de fonctionnalités officiellement prises en charge pour cette méthode, voici donc deux fonctionnalités typiques.
La méthode utilisée par Mercari etc. est de créer un référentiel dédié pour .proto. (Gestion de fichiers Proto) Dans ce cas, il sera plus facile de générer une bibliothèque pour chaque langue client en établissant un lien avec CI. Vous pouvez également utiliser le sous-module de Git, etc. pour importer le fichier .proto côté client sans le faire.
Lors de la gestion avec un référentiel dédié, le code côté serveur et le référentiel du fichier .proto sont séparés, mais je pense que c'est souvent plus facile à gérer en incluant .proto côté serveur qui l'implémente. Dans ce cas, vous pouvez utiliser protodep pour agréger les fichiers .proto dans chaque référentiel côté client. Cela permet de gérer les fichiers .proto dans l'application côté serveur et d'utiliser les fichiers .proto requis côté client.
gRPC-Web Pour utiliser gRPC depuis un navigateur, il existe gRPC-Web. Pour le moment, nginx et envoy sont utilisés comme proxy pour réaliser la communication entre le navigateur et le serveur gRPC.
REST API Dans de nombreux cas, vous souhaitez accéder à une application créée avec gRPC via l'API REST. Dans ce cas, grpc-gateway est pratique. Cela va être un peu long, donc je vais le résumer comme une autre entrée.
gRPC est un mécanisme très efficace pour construire un système via plusieurs applications, mais je pense qu'il y a de nombreux points peu clairs dans le fonctionnement réel, donc je l'ai résumé. De plus, je mettrai à jour cette entrée au fur et à mesure que je gagnerai en connaissances.
Recommended Posts