Très simplement
Parce que.
En plus des appels d'API synchrones, les systèmes récents utilisent de plus en plus la messagerie asynchrone pour l'évolutivité et d'autres facteurs.
Le problème ici est de savoir comment gérer le schéma car il s'étend sur plusieurs systèmes.
Dans ce cas, quelque chose qui est partageable et rétrocompatible avec le format de sérialisation, comme la définition de l'API REST, est souhaitable.
La principale définition de schéma JSON souvent utilisée pour la messagerie est le schéma JSON, qui n'est en aucun cas facile à gérer.
Par conséquent, le candidat est Protocol Buffers, qui est également utilisé dans gRPC.
Étant donné que Protocol Buffers est un IDL (langage de définition d'interface), vous pouvez définir le schéma de manière concise sans être lié par les spécifications conventionnelles du schéma JSON.
Ainsi, sur Spring Cloud Stream, qui est un framework de messagerie Spring souvent utilisé en Java, la messagerie par Protocol Buffers est effectuée. regarder.
Spring Cloud Stream prend en charge Json comme format de conversion par défaut, mais la classe MessageConverter
est fournie pour prendre en charge d'autres formats.
C'est donc simple à faire
Nous mettrons en œuvre concrètement dans l'ordre.
Puisqu'il s'agit d'un exemple, la gestion détaillée des erreurs, etc. est omise, mais la classe d'implémentation MessageConverter suivante est préparée.
public class ProtobufMessageConverter extends AbstractMessageConverter {
public ProtobufMessageConverter() {
//Préparer MIME pour les tampons de protocole
super(new MimeType("application", "x-protobuf"));
}
@Override
protected boolean supports(Class<?> clazz) {
//Spécifiez la super classe utilisée pour les messages Protocol Buffers comme classe de support
return AbstractMessageLite.class.isAssignableFrom(clazz);
}
@Override
protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
//Conversion en tableau d'octets avec la fonction de tampons de protocole
return ((AbstractMessageLite) payload).toByteArray();
}
@Override
@lombok.SneakyThrows
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
//Conversion de MethodParameter of conversionHint à la définition attendue par le destinataire
if (conversionHint instanceof MethodParameter) {
MethodParameter param = (MethodParameter) conversionHint;
param = param.nestedIfOptional();
if (Message.class.isAssignableFrom(param.getParameterType())) {
param = param.nested();
}
Class<?> clazz = param.getNestedParameterType();
Method parseFrom = clazz.getMethod("parseFrom", byte[].class);
return parseFrom.invoke(null, payload);
}
return payload;
}
}
En ce qui concerne la méthode convertFromInternal, je pense qu'il existe d'autres méthodes car cela dépend de la conception du côté réception du message, mais cette fois, je vais en faire une implémentation simple.
Définissez un bean dans chaque application qui utilise le MessageConverter créé.
@Configuration
class Config {
@Bean
@StreamMessageConverter
public MessageConverter messageConverter() {
return new ProtobufMessageConverter();
}
}
Défini pour reconnaître le type de contenu dans le flux à utiliser.
application.yaml
spring:
cloud:
stream:
bindings:
output:
destination: tasks
content-type: application/x-protobuf #Spécifiez le MIME des tampons de protocole créés
producer:
partitionKeyExpression: "1"
application.yaml
spring:
cloud:
stream:
bindings:
input:
destination: tasks
content-type: application/x-protobuf #Spécifiez le MIME des tampons de protocole créés
Ceci termine la prise en charge des tampons de protocole.
Le reste est identique à l'utilisation de Spring Cloud Stream et de Protocol Buffers.
task.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.task";
message TaskCreated {
string id = 1;
string name = 2;
}
message TaskStarted {
string id = 1;
}
message TaskDone {
string id = 1;
}
@EnableBinding(Source.class)
@AllArgsConstructor
class Publisher {
Source source;
void create() {
TaskCreated task = // ...
Message<TaskCreated> message = new GenericMessage<>(task);
source.output().send(message);
}
void start() {
TaskStarted task = // ...
Message<TaskStarted> message = new GenericMessage<>(task);
source.output().send(message);
}
void done() {
TaskDone task = // ...
Message<TaskDone> message = new GenericMessage<>(task);
source.output().send(message);
}
}
@EnableBinding(Sink.class)
class Subscriber {
@StreamListener(Sink.INPUT)
void handle(TaskCreated message) {
System.out.println(message);
}
@StreamListener(Sink.INPUT)
void handle(TaskStarted message) {
System.out.println(message);
}
@StreamListener(Sink.INPUT)
void handle(TaskDone message) {
System.out.println(message);
}
}
> id: "xxx-1"
name: "Task 1"
> id: "xxx-1"
> id: "xxx-1"
L'expéditeur envoie la classe Protocol Buffers en tant que message et le récepteur utilise @StreamListener pour spécifier la classe que vous souhaitez recevoir, afin que vous puissiez l'utiliser sans connaître le mécanisme d'implémentation de la messagerie.
Spring Cloud Stream n'est pas limité aux tampons de protocole, mais Message Converter vous permet d'utiliser n'importe quel format, il est donc pratique car il permet la messagerie des applications indépendante du code selon les besoins!
Le code source utilisé cette fois est ci-dessous.
Recommended Posts