[JAVA] Unterstützt Protokollpuffer für Spring Cloud Stream

Kobito.D5Bp80.png

Warum Spring Cloud Stream- und Protokollpuffer?

Sehr einfach

_Ich möchte das Messaging einfacher machen _

Weil.

Zusätzlich zu synchronen API-Aufrufen verwenden neuere Systeme zunehmend asynchrones Messaging, um die Skalierbarkeit und andere Faktoren zu gewährleisten.

Das Problem hierbei ist, wie das Schema verwaltet wird, da es mehrere Systeme umfasst.

In diesem Fall ist etwas wünschenswert, das gemeinsam genutzt werden kann und mit dem Serialisierungsformat abwärtskompatibel ist, z. B. die REST-API-Definition.

Die wichtigste JSON-Schemadefinition, die häufig für Messaging verwendet wird, ist das JSON-Schema, das keineswegs [einfach zu handhaben] ist (https://qiita.com/yugui/items/160737021d25d761b353#protobuf).

Daher ist der Kandidat Protokollpuffer, der auch in gRPC verwendet wird.

Da Protocol Buffers eine IDL (Interface Definition Language) ist, können Sie das Schema präzise definieren, ohne an die herkömmlichen Spezifikationen für das JSON-Schema gebunden zu sein.

In Spring Cloud Stream, einem in Java häufig verwendeten Framework für Spring-Messaging, wird das Messaging durch Protokollpuffer ausgeführt. schauen.

Grundmechanismus

Spring Cloud Stream unterstützt Json als Standardkonvertierungsformat, aber die Klasse "MessageConverter" wird bereitgestellt, um andere Formate zu unterstützen.

Das ist also ganz einfach

  1. Erstellen Sie eine MessageConverter-Implementierung von Protokollpuffern
  2. Bean Registrierung in Ihrer Anwendung
  3. Geben Sie in application.yaml (Propeties) das Protokollpuffer-MIME für den Inhaltstyp des Streams an. Es gibt 3 Punkte.

Implementierungsmethode

Wir werden konkret umsetzen.

1. Erstellen Sie eine MessageConverter-Implementierung von Protokollpuffern

Da es sich um ein Beispiel handelt, werden detaillierte Fehlerbehandlungen usw. weggelassen, aber die folgende MessageConverter-Implementierungsklasse wird vorbereitet.

public class ProtobufMessageConverter extends AbstractMessageConverter {

	public ProtobufMessageConverter() {
	    //Bereiten Sie MIME für Protokollpuffer vor
		super(new MimeType("application", "x-protobuf"));
	}

	@Override
	protected boolean supports(Class<?> clazz) {
	    //Geben Sie die für Protokollpuffernachrichten verwendete Superklasse als Unterstützungsklasse an
		return AbstractMessageLite.class.isAssignableFrom(clazz);
	}

	@Override
	protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
	    //Konvertierung in ein Byte-Array mit der Funktion von Protokollpuffern
		return ((AbstractMessageLite) payload).toByteArray();
	}

	@Override
	@lombok.SneakyThrows
	protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
		Object payload = message.getPayload();

		//Konvertieren Sie vom MethodParameter von convertHint in die vom Empfänger erwartete Definition
		if (conversionHint instanceof MethodParameter) {
			MethodParameter param = (MethodParameter) conversionHint;
			param = param.nestedIfOptional();
			if (Message.class.isAssignableFrom(param.getParameterType())) {
				param = param.nested();
			}
			Class<?> clazz = param.getNestedParameterType();
			Method parseFrom = clazz.getMethod("parseFrom", byte[].class);
			return parseFrom.invoke(null, payload);
		}
		return payload;
	}
}

In Bezug auf die convertFromInternal-Methode gibt es meiner Meinung nach andere Methoden, da dies vom Design der Nachrichtenempfangsseite abhängt. Diesmal werde ich jedoch eine einfache Implementierung vornehmen.

2. Bean Registrierung in Ihrer Anwendung

Definieren Sie in jeder Anwendung, die den erstellten MessageConverter verwendet, eine Bean.

@Configuration
class Config {
	@Bean
	@StreamMessageConverter
	public MessageConverter messageConverter() {
		return new ProtobufMessageConverter();
	}
}

3. Geben Sie in application.yaml (Propeties) das Protokollpuffer-MIME für den Inhaltstyp des Streams an.

Legen Sie fest, dass der Inhaltstyp im zu verwendenden Stream erkannt wird.

Absender

application.yaml


spring:
  cloud:
    stream:
      bindings:
        output:
          destination: tasks
          content-type: application/x-protobuf #Geben Sie die MIME der erstellten Protokollpuffer an
          producer:
            partitionKeyExpression: "1"

Empfänger

application.yaml


spring:
  cloud:
    stream:
      bindings:
        input:
          destination: tasks
          content-type: application/x-protobuf #Geben Sie die MIME der erstellten Protokollpuffer an

Damit ist die Unterstützung für Protokollpuffer abgeschlossen.

Wie benutzt man

Der Rest entspricht der Verwendung von Spring Cloud Stream- und Protokollpuffern.

Schemadefinition

task.proto


syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.task";

message TaskCreated {
    string id = 1;
    string name = 2;
}

message TaskStarted {
    string id = 1;
}

message TaskDone {
    string id = 1;
}

Übertragungsprozess

@EnableBinding(Source.class)
@AllArgsConstructor
class Publisher {

	Source source;

	void create() {
		TaskCreated task = // ...
		Message<TaskCreated> message = new GenericMessage<>(task);
		source.output().send(message);
	}

	void start() {
		TaskStarted task = // ...
		Message<TaskStarted> message = new GenericMessage<>(task);
		source.output().send(message);
	}

	void done() {
		TaskDone task = // ...
		Message<TaskDone> message = new GenericMessage<>(task);
		source.output().send(message);
	}
}

Empfangsverarbeitung

@EnableBinding(Sink.class)
class Subscriber {

	@StreamListener(Sink.INPUT)
	void handle(TaskCreated message) {
		System.out.println(message);
	}

	@StreamListener(Sink.INPUT)
	void handle(TaskStarted message) {
		System.out.println(message);
	}

	@StreamListener(Sink.INPUT)
	void handle(TaskDone message) {
		System.out.println(message);
	}
}

Ausgabe

> id: "xxx-1"
name: "Task 1"

> id: "xxx-1"

> id: "xxx-1"

Der Absender sendet die Protokollpufferklasse als Nachricht, und der Empfänger gibt @StreamListener an, um die Klasse anzugeben, die Sie empfangen möchten, sodass Sie sie verwenden können, ohne den Mechanismus der Nachrichtenimplementierung zu kennen.

Spring Cloud Stream ist nicht auf Protokollpuffer beschränkt, aber mit Message Converter können Sie jedes Format verwenden. Dies ist praktisch, da es das Code-unabhängige Messaging von Anwendungen nach Bedarf ermöglicht!

Quellcode

Der diesmal verwendete Quellcode ist unten.

Recommended Posts

Unterstützt Protokollpuffer für Spring Cloud Stream
Spring Cloud Stream Demo veröffentlicht
Pläne zur Unterstützung von JDK 11 für Eclipse und Spring Boot
Versuchen Sie vorerst, Spring Cloud Config auszuführen
Mehrsprachige Unterstützung für Spring Framework
Hinweise zu Protokollpuffern
Spring Cloud Netflix-Memo
Microservices in Spring Cloud