Senden und Empfangen einfacher Nachrichten mit der Kafka-Komponente des Apache Camel-Frameworks (Java DSL Edition)

Einführung

Apache kafka ist ein verteiltes Open Source-Nachrichtensystem, das eine große Anzahl von Nachrichten mit hoher Geschwindigkeit veröffentlichen / abonnieren kann. Hier erstellen wir eine Anwendung, die mithilfe von Apache Camel, einem integrierten Framework, Nachrichten mit kafka austauscht. Verwenden Sie die Camel-Kafka-Komponente, um mit Apache Camel eine Verbindung zu Kafka herzustellen.

In dieser Umgebung verwendet kafka v2.0.0. Weitere Informationen zum Erstellen der Umgebung finden Sie im folgenden Artikel.

Ich habe bereits im nächsten Artikel eine ähnliche Anwendung in XML DSL geschrieben, aber dieses Mal werde ich eine Anwendung mit Java DSL erstellen. Außerdem möchte ich ausführlicher als beim letzten Mal schreiben.

Beschreibung der zu erstellenden Anwendung

Die diesmal erstellte Anwendung veröffentlicht die Datums- und Uhrzeitnachricht jede Sekunde an kafka, wie in der folgenden Abbildung gezeigt, und abonniert dieselbe Nachricht von kafka.

image.png

Mit Apache Camel kann eine solche Anwendung mit Kafka mit nur wenigen Dutzend Codezeilen implementiert werden. Es ist kein hübscher Code, weil ich den gesamten Code in die Hauptfunktion eingefügt habe, aber ich hoffe, Sie können sehen, dass er von Camel einfach implementiert werden kann.

	public static void main(String[] args) {
		try {
			CamelContext context = new DefaultCamelContext();
			KafkaComponent kafka = new KafkaComponent();
			kafka.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // kafka1, kafka2,Registrieren Sie 3 Server von kafka3 als Broker
			context.addComponent("kafka", kafka);

			context.addRoutes(new RouteBuilder() {
				public void configure() {
					from("timer:trigger?period=1000") //Führen Sie alle 1000 Millisekunden aus
							.routeId("kafka_producer_route")
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Stellen Sie das aktuelle Datum und die aktuelle Uhrzeit für den KÖRPER der Nachricht ein
							.to("kafka:test01"); //Nachricht zum Thema test01 veröffentlichen

					from("kafka:test01") //Nachricht vom Thema test01 abonnieren
							.routeId("kafka_consumer_route")
							.log("body = ${body}"); //Zeigen Sie den KÖRPER-Inhalt der Nachricht im Protokoll an
				}
			});

			context.start();
			Thread.sleep(10000);
			context.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

Jetzt erstellen wir eine Kafka-Anwendung.

Fügen Sie eine Bibliothek hinzu, um kafka zu verwenden

Fügen Sie für Maven die folgende Bibliothek zu pom.xml hinzu. camel-kafka ist eine Komponente für den Umgang mit Kafka in Camel, und $ {camel.version} gibt die Version des verwendeten Kamels an. kafka-clients ist eine Clientbibliothek für Kafka.

pom.xml


		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-kafka</artifactId>
			<version>${camel.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.0.0</version>
		</dependency>

Komponenten-URI

Die URI für die Verwendung der Kamel-Kafka-Komponente sieht folgendermaßen aus:

kafka:topic[?options]

Geben Sie den Namen des Zielthemas im Kontextpfad (Thema) an.

Erstellen Sie eine Route zum Veröffentlichen / Abonnieren von Nachrichten

Erstellen Sie eine Route, um die Nachricht in kafka zu veröffentlichen. Die in Java DSL geschriebene Route lautet wie folgt.

Das Thema wird automatisch erstellt. Wenn Sie jedoch die Themeneinstellungen ändern möchten, erstellen Sie es manuell. Der Code ist zu einfach zu erklären.

				from("timer:trigger?period=1000") //Führen Sie alle 1000 Millisekunden aus
						.routeId("kafka_producer_route")
						.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Stellen Sie das aktuelle Datum und die aktuelle Uhrzeit für den KÖRPER der Nachricht ein
						.to("kafka:test01"); //Nachricht zum Thema test01 veröffentlichen

Erstellen Sie als Nächstes eine Route, die die Nachricht an kafka abonniert. Die in Java DSL geschriebene Route lautet wie folgt.

				from("kafka:test01") //Nachricht vom Thema test01 abonnieren
						.routeId("kafka_consumer_route")
						.log("body = ${body}"); //Zeigen Sie den KÖRPER-Inhalt der Nachricht im Protokoll an

Da es allein nicht möglich ist, eine Verbindung zu kafka herzustellen, schreiben Sie die Einstellungen für die Verbindung. Erstellen Sie zunächst eine Instanz der Kafka-Konfiguration, die die Kafka-Einstellungen registriert, und konfigurieren Sie die Einstellungen für die Verbindung mit Kafka. Es gibt verschiedene Einstellungselemente, die ich später kurz erläutern möchte. Im folgenden Beispiel werden einige Einstellungen vorgenommen. Wenn Sie jedoch nur eine Verbindung herstellen möchten, legen Sie einfach den Broker fest, zu dem eine Verbindung mit der setBrokers-Methode hergestellt werden soll. Ansonsten wird der Standardwert verwendet und Sie müssen ihn nicht festlegen.

		KafkaConfiguration kafkaConfig = new KafkaConfiguration();
		kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Verbinden Sie sich mit 3 Kafka-Brokern
		kafkaConfig.setGroupId("group1"); //Legen Sie die Verbrauchergruppen-ID fest
		kafkaConfig.setAutoCommitEnable(true); //Aktivieren Sie die automatische Festschreibung
		kafkaConfig.setAutoCommitIntervalMs(5000); //Legen Sie das Intervall für das automatische Festschreiben fest
		kafkaConfig.setAutoOffsetReset("earliest"); //Verhalten beim Abonnieren einer Partition ohne festgeschriebenen Offset
		kafkaConfig.setRequestRequiredAcks("all"); //Die Bedingungen für die Beurteilung dieser Veröffentlichung waren erfolgreich
		kafkaConfig.setConsumersCount(1); //Anzahl der Verbraucher

Erstellen Sie als Nächstes eine Instanz von KafkaComponent und legen Sie die zuvor erstellte Instanz von KafkaConfiguration fest.

		KafkaComponent kafka = new KafkaComponent();
		kafka.setConfiguration(kafkaConfig);

Dies ist die einzige Einstellung für die Verbindung zu Kafka.

Damit sind die Publisher- und Consumer-Routen sowie die Kafka-Verbindungseinstellungen abgeschlossen. Die gesamte Quelle, die die Hauptfunktion usw. erstellt hat, die diese verwendet, ist wie folgt.

package example.camelbegginer.kafka;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.impl.DefaultCamelContext;

public class KafkaExample {

	public static void main(String[] args) {
		try {
			CamelContext context = new DefaultCamelContext();
			context.addComponent("kafka", createKafkaComponent());
			context.addRoutes(createProducerRouteBuilder());
			context.addRoutes(createConsumerRouteBuilder());

			context.start();
			Thread.sleep(10000);
			context.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	static KafkaComponent createKafkaComponent() {
		KafkaConfiguration kafkaConfig = new KafkaConfiguration();
		kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Verbinden Sie sich mit 3 Kafka-Brokern
		kafkaConfig.setGroupId("group1"); //Gruppen-ID einstellen
		kafkaConfig.setAutoCommitEnable(true); //Aktivieren Sie die automatische Festschreibung
		kafkaConfig.setAutoCommitIntervalMs(5000); //Legen Sie das Intervall für das automatische Festschreiben fest
		kafkaConfig.setAutoOffsetReset("earliest"); //Verhalten beim Lesen einer Partition ohne festgeschriebenen Offset
		kafkaConfig.setRequestRequiredAcks("all"); //Die Bedingungen für die Beurteilung dieser Veröffentlichung waren erfolgreich
		kafkaConfig.setConsumersCount(1); //Anzahl der Verbraucher

		KafkaComponent kafka = new KafkaComponent();
		kafka.setConfiguration(kafkaConfig);

		return kafka;
	}

	static RouteBuilder createProducerRouteBuilder() {
		return new RouteBuilder() {
			public void configure() throws Exception {
				from("timer:trigger?period=1000") //Führen Sie alle 1000 Millisekunden aus
						.routeId("kafka_producer_route")
						.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Stellen Sie das aktuelle Datum und die aktuelle Uhrzeit für den KÖRPER der Nachricht ein
						.to("kafka:test01"); //Nachricht zum Thema test01 veröffentlichen
			}
		};
	}

	static RouteBuilder createConsumerRouteBuilder() {
		return new RouteBuilder() {
			public void configure() throws Exception {
				from("kafka:test01") //Nachricht vom Thema test01 abonnieren
						.routeId("kafka_consumer_route")
						.log("body = ${body}"); //Zeigen Sie den KÖRPER-Inhalt der Nachricht im Protokoll an
			}
		};
	}
}

Führen Sie die von Ihnen erstellte Camel-Anwendung aus

Wenn diese Anwendung ausgeführt wird, wird das Protokoll wie unten gezeigt jede Sekunde ausgegeben.

[2019-01-31 21:15:38.112], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:38
[2019-01-31 21:15:39.031], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:39
[2019-01-31 21:15:40.034], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:40
[2019-01-31 21:15:41.026], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:41
[2019-01-31 21:15:42.029], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:42
[2019-01-31 21:15:43.024], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:43
[2019-01-31 21:15:44.044], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:44
[2019-01-31 21:15:45.028], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:45
[2019-01-31 21:15:46.032], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:46

Wenn Sie den Header mit dem folgenden Code ausgeben, werden der Nachrichtenoffset und die Partitionsinformationen in das Protokoll ausgegeben.

				from("kafka:test01") //Nachricht vom Thema test01 abonnieren
						.routeId("kafka_consumer_route")
						.log("body = ${body}") //Zeigen Sie den KÖRPER-Inhalt der Nachricht im Protokoll an
						.log("${headers}"); //★ Hier hinzufügen

Das Ausgabeprotokoll lautet wie folgt.

2019-02-01 10:10:46.236], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, {breadcrumbId=[B@58a2fc46, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = breadcrumbId, value = [73, 68, 45, 109, 107, 121, 45, 80, 67, 45, 49, 53, 52, 56, 57, 56, 51, 52, 52, 51, 56, 55, 51, 45, 48, 45, 49])], isReadOnly = false), kafka.OFFSET=52, kafka.PARTITION=0, kafka.TIMESTAMP=1548983446195, kafka.TOPIC=test01}

Auf diese Weise können Sie die Kafka-Komponente des Apache Camel-Frameworks verwenden, um eine Anwendung zu erstellen, die einfache Nachrichten sendet und empfängt. Wussten Sie, dass die Verwendung von Camel die Verbindung mit kafka erleichtert?

Haupteigenschaften der Kamel-Kafka-Komponente

Abschließend werde ich die Haupteigenschaften der Kamel-Kafka-Komponente erläutern. Es gibt viele andere Eigenschaften als die folgende Tabelle. Weitere Informationen finden Sie auf der offiziellen Seite.

Es gibt viele Eigenschaften, aber einige sind die gleichen wie die vom Kafka-Client verwendeten, sodass Sie sie ohne Beschwerden verwenden können.

Name des Anwesens producer/consumer Erläuterung Standardwert Schimmel
brokers Verbreitet Geben Sie den Kafka-Broker an. Das Format ist host1:Wenn Sie mehrere Broker auf Port1, Host1 angeben:port1,host2:Getrennt von Port2 und Komma. String
clientId Verbreitet Die Client-ID ist eine benutzerdefinierte Zeichenfolge zum Verfolgen von Anrufen aus der Clientanwendung. String
autoCommitEnable consumer Wenn true festgelegt ist, schreibt der Verbraucher den Offset der abgerufenen Nachricht regelmäßig automatisch fest. Das Autocommit-Intervall wird mit der Option autoCommitIntervalMs angegeben. TRUE Boolean
autoCommitIntervalMs consumer Gibt das Intervall (Millisekunden) an, in dem der Verbraucher den Nachrichtenoffset automatisch festschreibt. 5000 Integer
autoCommitOnStop consumer Gibt an, ob die zuletzt abonnierte Nachricht vom Broker explizit automatisch festgeschrieben werden soll, wenn der Verbraucher ausfällt. Dazu muss die Option autoCommitEnable aktiviert sein. Mögliche Werte sind sync, async oder none. sync String
autoOffsetReset consumer Wenn es keinen anfänglichen Versatz gibt oder wenn der Versatz außerhalb des Bereichs liegt, gehen Sie wie folgt vor: frühestens: Setzt den Versatz auf den allerersten Versatz zurück. Neueste: Setzt den Versatz automatisch auf den letzten (letzten) Versatz zurück. fail: Löst eine Ausnahme für den Verbraucher aus. latest String
consumerRequestTimeoutMs consumer Die Konfiguration steuert die maximale Zeit, die ein Client auf die Beantwortung einer Anfrage wartet. Wenn vor Ablauf des Zeitlimits keine Antwort eingeht, sendet der Client die Anforderung nach Bedarf erneut oder schlägt die Anforderung fehl, wenn die Wiederholungsversuche erschöpft sind. 40000 Integer
consumersCount consumer Gibt die Anzahl der Stemmers an, die eine Verbindung zum Kafka-Server herstellen. 1 int
consumerStreams consumer Anzahl gleichzeitiger Verbraucher. 10 int
groupId consumer Gibt eine Zeichenfolge an, die die Gruppe angibt, zu der dieser Verbraucher gehört. Das Festlegen derselben Gruppen-ID zeigt an, dass sich mehrere Verbraucher in derselben Verbrauchergruppe befinden. Dies ist eine erforderliche Option für Verbraucher. String
maxPollRecords consumer poll()Gibt die maximale Anzahl von Datensätzen an, die in einem einzelnen Aufruf an zurückgegeben werden. 500 Integer
pollTimeoutMs consumer Der Timeout-Wert (Millisekunden), der beim Abrufen des KafkaConsumer verwendet wird. 5000 Long
compressionCodec producer Mit diesem Parameter können Sie den Komprimierungscodec für die vom Hersteller generierten Daten angeben. Mögliche Werte sind none, gzip und snappy. Standardmäßig ist es nicht komprimiert (keine). none
requestRequiredAcks producer Legen Sie die Bedingungen fest, unter denen berücksichtigt werden kann, dass die Anforderung des Herstellers abgeschlossen wurde. Sie geben den Zuverlässigkeitsgrad der Nachricht an. Es sind die folgenden drei Werte anzugeben. acks=Wenn es 0 ist, geht die Nachricht verloren, wenn der Broker des Anführers ausfällt, also normalerweise acks=Geben Sie 1 oder alle an. ・ Bestätigt=0producer betrachtet die Übertragung als abgeschlossen, ohne auf eine Bestätigung vom Server zu warten. In diesem Fall können wir nicht garantieren, dass kafka die Nachricht erhalten hat. Der für jeden Datensatz zurückgegebene Offset ist immer-Auf 1 setzen. ・ Bestätigt=1 Der Leser schreibt die Nachricht lokal, antwortet jedoch, ohne auf die Fertigstellung durch andere Follower zu warten. In diesem Fall können andere Follower die Nachricht nicht replizieren, wenn der Leser unmittelbar nach der Genehmigung der Nachricht ausfällt, und die Nachricht geht verloren. ・ Bestätigt=all Wartet, bis alle anderen synchronen Replikat-Schreibvorgänge abgeschlossen sind, bevor der Leser eine Antwort zurückgibt. Dadurch wird sichergestellt, dass Nachrichten nicht verloren gehen, solange mindestens ein synchrones Replikat aktiv ist. 1 String
requestTimeoutMs producer Der Broker fordert an, bevor er einen Fehler an den Client (Produzenten) zurücksendet..required.Wartezeit, bevor die Anforderungen erfüllt werden. 305000 Integer
retries producer Bei einem Wert größer als 0 sendet der Client erneut Datensätze (wiederholt sie), die aufgrund eines vorübergehenden Fehlers nicht gesendet werden konnten. 0 Integer

Referenz

Senden und Empfangen einfacher Nachrichten mit der Kafka-Komponente des Apache Camel-Frameworks (alter Artikel)

Recommended Posts

Senden und Empfangen einfacher Nachrichten mit der Kafka-Komponente des Apache Camel-Frameworks (Java DSL Edition)
Transaktionsmanagement des integrierten Frameworks "Apache Camel"
Analysieren und objektivieren Sie JSON mithilfe der Annotation @JsonProperty von Jackson, einer Java-Bibliothek
Zeigen Sie den japanischen Kalender und Tag mit der Java8-Standardklasse an
Seien Sie vorsichtig mit Anfragen und Antworten, wenn Sie das Serverless Framework mit Java verwenden
[Java] Der verwirrende Teil von String und StringBuilder
Einfache Installation von Nginx und Docker mit ansible
Ich habe die Eigenschaften von Java und .NET verglichen