Very simply
Because.
In addition to synchronous API calls, recent systems are increasingly using asynchronous messaging for scalability.
The problem here is how to manage the schema because it spans multiple systems.
In this case, something that is sharable and backwards compatible with the serialization format, such as the REST API definition, is desirable.
JSON Schema is one of the major JSON schema definitions often used for messaging, but the definition is by no means easy to handle.
Therefore, the candidate is Protocol Buffers, which is also used in gRPC.
Since Protocol Buffers is an IDL (Interface Definition Language), you can define the schema concisely without being bound by the conventional specifications for JSON Schema.
So, on Spring Cloud Stream, which is a framework of Spring messaging that is often used in Java, messaging by Protocol Buffers is performed. to watch.
Spring Cloud Stream supports Json as the default conversion format, but the MessageConverter
class is provided to support other formats.
So it ’s simple to do
We will implement concretely in order.
Since it is a sample, detailed error handling etc. are omitted, but the following MessageConverter implementation class is prepared.
public class ProtobufMessageConverter extends AbstractMessageConverter {
public ProtobufMessageConverter() {
//Prepare MIME for Protocol Buffers
super(new MimeType("application", "x-protobuf"));
}
@Override
protected boolean supports(Class<?> clazz) {
//Specify the superclass used for Protocol Buffers messages as the support class
return AbstractMessageLite.class.isAssignableFrom(clazz);
}
@Override
protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
//Conversion to byte array with the function of Protocol Buffers
return ((AbstractMessageLite) payload).toByteArray();
}
@Override
@lombok.SneakyThrows
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
//Convert from the MethodParameter of conversionHint to the definition expected by the recipient
if (conversionHint instanceof MethodParameter) {
MethodParameter param = (MethodParameter) conversionHint;
param = param.nestedIfOptional();
if (Message.class.isAssignableFrom(param.getParameterType())) {
param = param.nested();
}
Class<?> clazz = param.getNestedParameterType();
Method parseFrom = clazz.getMethod("parseFrom", byte[].class);
return parseFrom.invoke(null, payload);
}
return payload;
}
}
Regarding the convertFromInternal method, I think there are other methods because it depends on the design of the message receiving side, but this time I will make it a simple implementation.
Define a bean in each application that uses the created MessageConverter.
@Configuration
class Config {
@Bean
@StreamMessageConverter
public MessageConverter messageConverter() {
return new ProtobufMessageConverter();
}
}
Set to recognize the content-type in the stream to be used.
application.yaml
spring:
cloud:
stream:
bindings:
output:
destination: tasks
content-type: application/x-protobuf #Specify the MIME of the created Protocol Buffers
producer:
partitionKeyExpression: "1"
application.yaml
spring:
cloud:
stream:
bindings:
input:
destination: tasks
content-type: application/x-protobuf #Specify the MIME of the created Protocol Buffers
This completes the support for Protocol Buffers.
The rest is the same as using Spring Cloud Stream and Protocol Buffers as usual.
task.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.task";
message TaskCreated {
string id = 1;
string name = 2;
}
message TaskStarted {
string id = 1;
}
message TaskDone {
string id = 1;
}
@EnableBinding(Source.class)
@AllArgsConstructor
class Publisher {
Source source;
void create() {
TaskCreated task = // ...
Message<TaskCreated> message = new GenericMessage<>(task);
source.output().send(message);
}
void start() {
TaskStarted task = // ...
Message<TaskStarted> message = new GenericMessage<>(task);
source.output().send(message);
}
void done() {
TaskDone task = // ...
Message<TaskDone> message = new GenericMessage<>(task);
source.output().send(message);
}
}
@EnableBinding(Sink.class)
class Subscriber {
@StreamListener(Sink.INPUT)
void handle(TaskCreated message) {
System.out.println(message);
}
@StreamListener(Sink.INPUT)
void handle(TaskStarted message) {
System.out.println(message);
}
@StreamListener(Sink.INPUT)
void handle(TaskDone message) {
System.out.println(message);
}
}
> id: "xxx-1"
name: "Task 1"
> id: "xxx-1"
> id: "xxx-1"
By sending the Protocol Buffers class as a Message on the sending side and using @StreamListener on the receiving side to specify the class you want to receive, you can use it without being aware of the mechanism of messaging implementation.
Spring Cloud Stream is not limited to Protocol Buffers, but Message Converter allows you to use any format, so it is convenient because it enables code-independent messaging of applications as needed!
The source code used this time is below.
Recommended Posts