Sehr einfach
Weil.
Zusätzlich zu synchronen API-Aufrufen verwenden neuere Systeme zunehmend asynchrones Messaging, um die Skalierbarkeit und andere Faktoren zu gewährleisten.
Das Problem hierbei ist, wie das Schema verwaltet wird, da es mehrere Systeme umfasst.
In diesem Fall ist etwas wünschenswert, das gemeinsam genutzt werden kann und mit dem Serialisierungsformat abwärtskompatibel ist, z. B. die REST-API-Definition.
Die wichtigste JSON-Schemadefinition, die häufig für Messaging verwendet wird, ist das JSON-Schema, das keineswegs [einfach zu handhaben] ist (https://qiita.com/yugui/items/160737021d25d761b353#protobuf).
Daher ist der Kandidat Protokollpuffer, der auch in gRPC verwendet wird.
Da Protocol Buffers eine IDL (Interface Definition Language) ist, können Sie das Schema präzise definieren, ohne an die herkömmlichen Spezifikationen für das JSON-Schema gebunden zu sein.
In Spring Cloud Stream, einem in Java häufig verwendeten Framework für Spring-Messaging, wird das Messaging durch Protokollpuffer ausgeführt. schauen.
Spring Cloud Stream unterstützt Json als Standardkonvertierungsformat, aber die Klasse "MessageConverter" wird bereitgestellt, um andere Formate zu unterstützen.
Das ist also ganz einfach
Wir werden konkret umsetzen.
Da es sich um ein Beispiel handelt, werden detaillierte Fehlerbehandlungen usw. weggelassen, aber die folgende MessageConverter-Implementierungsklasse wird vorbereitet.
public class ProtobufMessageConverter extends AbstractMessageConverter {
public ProtobufMessageConverter() {
//Bereiten Sie MIME für Protokollpuffer vor
super(new MimeType("application", "x-protobuf"));
}
@Override
protected boolean supports(Class<?> clazz) {
//Geben Sie die für Protokollpuffernachrichten verwendete Superklasse als Unterstützungsklasse an
return AbstractMessageLite.class.isAssignableFrom(clazz);
}
@Override
protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
//Konvertierung in ein Byte-Array mit der Funktion von Protokollpuffern
return ((AbstractMessageLite) payload).toByteArray();
}
@Override
@lombok.SneakyThrows
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
//Konvertieren Sie vom MethodParameter von convertHint in die vom Empfänger erwartete Definition
if (conversionHint instanceof MethodParameter) {
MethodParameter param = (MethodParameter) conversionHint;
param = param.nestedIfOptional();
if (Message.class.isAssignableFrom(param.getParameterType())) {
param = param.nested();
}
Class<?> clazz = param.getNestedParameterType();
Method parseFrom = clazz.getMethod("parseFrom", byte[].class);
return parseFrom.invoke(null, payload);
}
return payload;
}
}
In Bezug auf die convertFromInternal-Methode gibt es meiner Meinung nach andere Methoden, da dies vom Design der Nachrichtenempfangsseite abhängt. Diesmal werde ich jedoch eine einfache Implementierung vornehmen.
Definieren Sie in jeder Anwendung, die den erstellten MessageConverter verwendet, eine Bean.
@Configuration
class Config {
@Bean
@StreamMessageConverter
public MessageConverter messageConverter() {
return new ProtobufMessageConverter();
}
}
Legen Sie fest, dass der Inhaltstyp im zu verwendenden Stream erkannt wird.
application.yaml
spring:
cloud:
stream:
bindings:
output:
destination: tasks
content-type: application/x-protobuf #Geben Sie die MIME der erstellten Protokollpuffer an
producer:
partitionKeyExpression: "1"
application.yaml
spring:
cloud:
stream:
bindings:
input:
destination: tasks
content-type: application/x-protobuf #Geben Sie die MIME der erstellten Protokollpuffer an
Damit ist die Unterstützung für Protokollpuffer abgeschlossen.
Der Rest entspricht der Verwendung von Spring Cloud Stream- und Protokollpuffern.
task.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.task";
message TaskCreated {
string id = 1;
string name = 2;
}
message TaskStarted {
string id = 1;
}
message TaskDone {
string id = 1;
}
@EnableBinding(Source.class)
@AllArgsConstructor
class Publisher {
Source source;
void create() {
TaskCreated task = // ...
Message<TaskCreated> message = new GenericMessage<>(task);
source.output().send(message);
}
void start() {
TaskStarted task = // ...
Message<TaskStarted> message = new GenericMessage<>(task);
source.output().send(message);
}
void done() {
TaskDone task = // ...
Message<TaskDone> message = new GenericMessage<>(task);
source.output().send(message);
}
}
@EnableBinding(Sink.class)
class Subscriber {
@StreamListener(Sink.INPUT)
void handle(TaskCreated message) {
System.out.println(message);
}
@StreamListener(Sink.INPUT)
void handle(TaskStarted message) {
System.out.println(message);
}
@StreamListener(Sink.INPUT)
void handle(TaskDone message) {
System.out.println(message);
}
}
> id: "xxx-1"
name: "Task 1"
> id: "xxx-1"
> id: "xxx-1"
Der Absender sendet die Protokollpufferklasse als Nachricht, und der Empfänger gibt @StreamListener an, um die Klasse anzugeben, die Sie empfangen möchten, sodass Sie sie verwenden können, ohne den Mechanismus der Nachrichtenimplementierung zu kennen.
Spring Cloud Stream ist nicht auf Protokollpuffer beschränkt, aber mit Message Converter können Sie jedes Format verwenden. Dies ist praktisch, da es das Code-unabhängige Messaging von Anwendungen nach Bedarf ermöglicht!
Der diesmal verwendete Quellcode ist unten.
Recommended Posts