[JAVA] Prise en charge des tampons de protocole pour Spring Cloud Stream

Kobito.D5Bp80.png

Pourquoi Spring Cloud Stream et les tampons de protocole

Très simplement

_Je souhaite faciliter la messagerie _

Parce que.

En plus des appels d'API synchrones, les systèmes récents utilisent de plus en plus la messagerie asynchrone pour l'évolutivité et d'autres facteurs.

Le problème ici est de savoir comment gérer le schéma car il s'étend sur plusieurs systèmes.

Dans ce cas, quelque chose qui est partageable et rétrocompatible avec le format de sérialisation, comme la définition de l'API REST, est souhaitable.

La principale définition de schéma JSON souvent utilisée pour la messagerie est le schéma JSON, qui n'est en aucun cas facile à gérer.

Par conséquent, le candidat est Protocol Buffers, qui est également utilisé dans gRPC.

Étant donné que Protocol Buffers est un IDL (langage de définition d'interface), vous pouvez définir le schéma de manière concise sans être lié par les spécifications conventionnelles du schéma JSON.

Ainsi, sur Spring Cloud Stream, qui est un framework de messagerie Spring souvent utilisé en Java, la messagerie par Protocol Buffers est effectuée. regarder.

Mécanisme de base

Spring Cloud Stream prend en charge Json comme format de conversion par défaut, mais la classe MessageConverter est fournie pour prendre en charge d'autres formats.

C'est donc simple à faire

  1. Créez une implémentation MessageConverter de Protocol Buffers
  2. Enregistrement Bean dans votre application
  3. Spécifiez les tampons de protocole MIME pour le type de contenu du flux dans application.yaml (propriétés) Il y a 3 points.

Méthode de mise en œuvre

Nous mettrons en œuvre concrètement dans l'ordre.

1. Créez une implémentation MessageConverter de Protocol Buffers

Puisqu'il s'agit d'un exemple, la gestion détaillée des erreurs, etc. est omise, mais la classe d'implémentation MessageConverter suivante est préparée.

public class ProtobufMessageConverter extends AbstractMessageConverter {

	public ProtobufMessageConverter() {
	    //Préparer MIME pour les tampons de protocole
		super(new MimeType("application", "x-protobuf"));
	}

	@Override
	protected boolean supports(Class<?> clazz) {
	    //Spécifiez la super classe utilisée pour les messages Protocol Buffers comme classe de support
		return AbstractMessageLite.class.isAssignableFrom(clazz);
	}

	@Override
	protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
	    //Conversion en tableau d'octets avec la fonction de tampons de protocole
		return ((AbstractMessageLite) payload).toByteArray();
	}

	@Override
	@lombok.SneakyThrows
	protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
		Object payload = message.getPayload();

		//Conversion de MethodParameter of conversionHint à la définition attendue par le destinataire
		if (conversionHint instanceof MethodParameter) {
			MethodParameter param = (MethodParameter) conversionHint;
			param = param.nestedIfOptional();
			if (Message.class.isAssignableFrom(param.getParameterType())) {
				param = param.nested();
			}
			Class<?> clazz = param.getNestedParameterType();
			Method parseFrom = clazz.getMethod("parseFrom", byte[].class);
			return parseFrom.invoke(null, payload);
		}
		return payload;
	}
}

En ce qui concerne la méthode convertFromInternal, je pense qu'il existe d'autres méthodes car cela dépend de la conception du côté réception du message, mais cette fois, je vais en faire une implémentation simple.

2. Enregistrement Bean dans votre application

Définissez un bean dans chaque application qui utilise le MessageConverter créé.

@Configuration
class Config {
	@Bean
	@StreamMessageConverter
	public MessageConverter messageConverter() {
		return new ProtobufMessageConverter();
	}
}

3. Spécifiez les tampons de protocole MIME pour le type de contenu du flux dans application.yaml (propriétés)

Défini pour reconnaître le type de contenu dans le flux à utiliser.

Expéditeur

application.yaml


spring:
  cloud:
    stream:
      bindings:
        output:
          destination: tasks
          content-type: application/x-protobuf #Spécifiez le MIME des tampons de protocole créés
          producer:
            partitionKeyExpression: "1"

Receveur

application.yaml


spring:
  cloud:
    stream:
      bindings:
        input:
          destination: tasks
          content-type: application/x-protobuf #Spécifiez le MIME des tampons de protocole créés

Ceci termine la prise en charge des tampons de protocole.

Comment utiliser

Le reste est identique à l'utilisation de Spring Cloud Stream et de Protocol Buffers.

Définition de schéma

task.proto


syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.task";

message TaskCreated {
    string id = 1;
    string name = 2;
}

message TaskStarted {
    string id = 1;
}

message TaskDone {
    string id = 1;
}

Processus de transmission

@EnableBinding(Source.class)
@AllArgsConstructor
class Publisher {

	Source source;

	void create() {
		TaskCreated task = // ...
		Message<TaskCreated> message = new GenericMessage<>(task);
		source.output().send(message);
	}

	void start() {
		TaskStarted task = // ...
		Message<TaskStarted> message = new GenericMessage<>(task);
		source.output().send(message);
	}

	void done() {
		TaskDone task = // ...
		Message<TaskDone> message = new GenericMessage<>(task);
		source.output().send(message);
	}
}

Traitement de la réception

@EnableBinding(Sink.class)
class Subscriber {

	@StreamListener(Sink.INPUT)
	void handle(TaskCreated message) {
		System.out.println(message);
	}

	@StreamListener(Sink.INPUT)
	void handle(TaskStarted message) {
		System.out.println(message);
	}

	@StreamListener(Sink.INPUT)
	void handle(TaskDone message) {
		System.out.println(message);
	}
}

production

> id: "xxx-1"
name: "Task 1"

> id: "xxx-1"

> id: "xxx-1"

L'expéditeur envoie la classe Protocol Buffers en tant que message et le récepteur utilise @StreamListener pour spécifier la classe que vous souhaitez recevoir, afin que vous puissiez l'utiliser sans connaître le mécanisme d'implémentation de la messagerie.

Spring Cloud Stream n'est pas limité aux tampons de protocole, mais Message Converter vous permet d'utiliser n'importe quel format, il est donc pratique car il permet la messagerie des applications indépendante du code selon les besoins!

Code source

Le code source utilisé cette fois est ci-dessous.

Recommended Posts

Prise en charge des tampons de protocole pour Spring Cloud Stream
Sortie de la démo Spring Cloud Stream
Plans pour prendre en charge JDK 11 pour Eclipse et Spring Boot
Essayez d'exécuter Spring Cloud Config pour le moment
Prise en charge multilingue de Spring Framework
Remarques sur les tampons de protocole
Mémo Spring Cloud Netflix
Microservices dans Spring Cloud