Envoi et réception de messages simples à l'aide du composant Kafka du framework Apache Camel (édition Java DSL)

introduction

Apache kafka est un système de messagerie distribué open source qui peut publier / souscrire un grand nombre de messages à haute vitesse. Ici, nous allons créer une application qui échange des messages avec kafka en utilisant Apache Camel, qui est un framework intégré. Utilisez le composant camel-kafka pour vous connecter à kafka avec Apache Camel.

Dans cet environnement, kafka utilise la v2.0.0. Reportez-vous à l'article suivant pour savoir comment créer l'environnement.

De plus, j'ai déjà écrit une application similaire en XML DSL dans le prochain article, mais cette fois je vais créer une application utilisant Java DSL. Aussi, je voudrais écrire plus en détail que la dernière fois.

Description de l'application à créer

L'application créée cette fois publie le message de date et d'heure sur kafka toutes les secondes comme indiqué dans la figure ci-dessous, et souscrit le même message de kafka.

image.png

Avec Apache Camel, une telle application utilisant kafka peut être implémentée avec seulement quelques dizaines de lignes de code ci-dessous. Ce n'est pas joli du code car j'ai mis tout le code dans la fonction principale, mais j'espère que vous pouvez voir qu'il peut être facilement implémenté par Camel.

	public static void main(String[] args) {
		try {
			CamelContext context = new DefaultCamelContext();
			KafkaComponent kafka = new KafkaComponent();
			kafka.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // kafka1, kafka2,Enregistrer 3 serveurs de kafka3 en tant que courtiers
			context.addComponent("kafka", kafka);

			context.addRoutes(new RouteBuilder() {
				public void configure() {
					from("timer:trigger?period=1000") //Exécuter toutes les 1000 millisecondes
							.routeId("kafka_producer_route")
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Réglez la date et l'heure actuelles du CORPS du message
							.to("kafka:test01"); //Publier le message dans le sujet test01

					from("kafka:test01") //Abonnez-vous au message du sujet test01
							.routeId("kafka_consumer_route")
							.log("body = ${body}"); //Afficher le contenu BODY du message dans le journal
				}
			});

			context.start();
			Thread.sleep(10000);
			context.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

Créons maintenant une application kafka.

Ajouter une bibliothèque pour utiliser kafka

Pour Maven, ajoutez la bibliothèque suivante à pom.xml. camel-kafka est un composant pour gérer Kafka dans Camel, et $ {camel.version} spécifie la version de camel que vous utilisez. kafka-clients est une bibliothèque cliente pour Kafka.

pom.xml


		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-kafka</artifactId>
			<version>${camel.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.0.0</version>
		</dependency>

URI du composant

L'URI pour utiliser le composant camel-kafka ressemble à ceci:

kafka:topic[?options]

Spécifiez le nom du sujet cible dans le chemin du contexte (sujet).

Créer un itinéraire pour publier / souscrire des messages

Créez un itinéraire pour publier le message sur kafka. La route écrite en Java DSL est la suivante.

Le sujet est créé automatiquement, mais si vous souhaitez modifier les paramètres du sujet, créez-le manuellement. Le code est trop simple à expliquer.

				from("timer:trigger?period=1000") //Exécuter toutes les 1000 millisecondes
						.routeId("kafka_producer_route")
						.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Réglez la date et l'heure actuelles du CORPS du message
						.to("kafka:test01"); //Publier le message dans le sujet test01

Ensuite, créez une route qui abonne le message à kafka. La route écrite en Java DSL est la suivante.

				from("kafka:test01") //Abonnez-vous au message du sujet test01
						.routeId("kafka_consumer_route")
						.log("body = ${body}"); //Afficher le contenu BODY du message dans le journal

Puisqu'il n'est pas possible de se connecter à kafka avec cela seul, écrivez les paramètres de connexion. Tout d'abord, créez une instance de Kafka Configuration pour enregistrer les paramètres kafka et configurez les paramètres pour vous connecter à kafka. Il existe différents éléments de réglage, et je voudrais les expliquer brièvement plus tard. Dans l'exemple ci-dessous, certains paramètres sont définis, mais si vous souhaitez simplement vous connecter, configurez simplement le courtier auquel se connecter avec la méthode setBrokers. À part cela, la valeur par défaut est utilisée et vous n'avez pas besoin de la définir.

		KafkaConfiguration kafkaConfig = new KafkaConfiguration();
		kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Connectez-vous à 3 courtiers Kafka
		kafkaConfig.setGroupId("group1"); //Définir l'ID du groupe de consommateurs
		kafkaConfig.setAutoCommitEnable(true); //Activer l'autocommit
		kafkaConfig.setAutoCommitIntervalMs(5000); //Définir l'intervalle de validation automatique
		kafkaConfig.setAutoOffsetReset("earliest"); //Comportement lors de l'abonnement à une partition sans décalage engagé
		kafkaConfig.setRequestRequiredAcks("all"); //Conditions pour juger que la publication a réussi
		kafkaConfig.setConsumersCount(1); //Nombre de consommateurs

Ensuite, créez une instance de KafkaComponent et définissez l'instance de KafkaConfiguration créée précédemment.

		KafkaComponent kafka = new KafkaComponent();
		kafka.setConfiguration(kafkaConfig);

C'est le seul paramètre de connexion à kafka.

Cela termine les itinéraires de l'éditeur et du consommateur et les paramètres de connexion kafka. La source entière qui a créé la fonction principale etc. qui les utilise est la suivante.

package example.camelbegginer.kafka;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.impl.DefaultCamelContext;

public class KafkaExample {

	public static void main(String[] args) {
		try {
			CamelContext context = new DefaultCamelContext();
			context.addComponent("kafka", createKafkaComponent());
			context.addRoutes(createProducerRouteBuilder());
			context.addRoutes(createConsumerRouteBuilder());

			context.start();
			Thread.sleep(10000);
			context.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	static KafkaComponent createKafkaComponent() {
		KafkaConfiguration kafkaConfig = new KafkaConfiguration();
		kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Connectez-vous à 3 courtiers Kafka
		kafkaConfig.setGroupId("group1"); //Définir l'ID de groupe
		kafkaConfig.setAutoCommitEnable(true); //Activer l'autocommit
		kafkaConfig.setAutoCommitIntervalMs(5000); //Définir l'intervalle de validation automatique
		kafkaConfig.setAutoOffsetReset("earliest"); //Comportement lors de la lecture d'une partition sans décalage engagé
		kafkaConfig.setRequestRequiredAcks("all"); //Conditions pour juger que la publication a réussi
		kafkaConfig.setConsumersCount(1); //Nombre de consommateurs

		KafkaComponent kafka = new KafkaComponent();
		kafka.setConfiguration(kafkaConfig);

		return kafka;
	}

	static RouteBuilder createProducerRouteBuilder() {
		return new RouteBuilder() {
			public void configure() throws Exception {
				from("timer:trigger?period=1000") //Exécuter toutes les 1000 millisecondes
						.routeId("kafka_producer_route")
						.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Réglez la date et l'heure actuelles du CORPS du message
						.to("kafka:test01"); //Publier le message dans le sujet test01
			}
		};
	}

	static RouteBuilder createConsumerRouteBuilder() {
		return new RouteBuilder() {
			public void configure() throws Exception {
				from("kafka:test01") //Abonnez-vous au message du sujet test01
						.routeId("kafka_consumer_route")
						.log("body = ${body}"); //Afficher le contenu BODY du message dans le journal
			}
		};
	}
}

Exécutez l'application Camel que vous avez créée

Lorsque cette application est exécutée, le journal est sorti toutes les secondes comme indiqué ci-dessous.

[2019-01-31 21:15:38.112], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:38
[2019-01-31 21:15:39.031], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:39
[2019-01-31 21:15:40.034], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:40
[2019-01-31 21:15:41.026], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:41
[2019-01-31 21:15:42.029], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:42
[2019-01-31 21:15:43.024], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:43
[2019-01-31 21:15:44.044], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:44
[2019-01-31 21:15:45.028], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:45
[2019-01-31 21:15:46.032], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:46

En outre, si vous sortez l'en-tête avec le code suivant, le décalage du message et les informations de partition seront affichés dans le journal.

				from("kafka:test01") //Abonnez-vous au message du sujet test01
						.routeId("kafka_consumer_route")
						.log("body = ${body}") //Afficher le contenu BODY du message dans le journal
						.log("${headers}"); //★ Ajouter ici

Le journal de sortie est le suivant.

2019-02-01 10:10:46.236], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, {breadcrumbId=[B@58a2fc46, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = breadcrumbId, value = [73, 68, 45, 109, 107, 121, 45, 80, 67, 45, 49, 53, 52, 56, 57, 56, 51, 52, 52, 51, 56, 55, 51, 45, 48, 45, 49])], isReadOnly = false), kafka.OFFSET=52, kafka.PARTITION=0, kafka.TIMESTAMP=1548983446195, kafka.TOPIC=test01}

De cette façon, vous pouvez utiliser le composant Kafka du framework Apache Camel pour créer une application qui envoie et reçoit des messages simples. Avez-vous compris que l'utilisation de Camel facilite la connexion avec kafka?

Principales propriétés du composant camel-kafka

Enfin, j'expliquerai les principales propriétés du composant camel-kafka. Il existe de nombreuses propriétés autres que le tableau ci-dessous, veuillez vous référer à la page officielle pour plus de détails.

Il existe de nombreuses propriétés, mais certaines sont les mêmes que celles utilisées par le client kafka, vous pouvez donc les utiliser sans aucune gêne.

Nom de la propriété producer/consumer La description Valeur par défaut Moule
brokers Commun Spécifiez le courtier Kafka. Le format est host1:Lors de la spécification de plusieurs courtiers sur le port1, hôte1:port1,host2:Séparé de port2 et virgule. String
clientId Commun L'ID client est une chaîne spécifiée par l'utilisateur pour le suivi des appels depuis l'application du client. String
autoCommitEnable consumer S'il est défini sur true, le consommateur validera automatiquement le décalage du message récupéré sur une base régulière. L'intervalle de validation automatique est spécifié avec l'option autoCommitIntervalMs. TRUE Boolean
autoCommitIntervalMs consumer Spécifie l'intervalle (en millisecondes) auquel le consommateur valide automatiquement le décalage du message. 5000 Integer
autoCommitOnStop consumer Spécifie s'il faut valider automatiquement automatiquement le dernier message abonné du courtier lorsque le consommateur tombe en panne. Cela nécessite que l'option autoCommitEnable soit activée. Les valeurs possibles sont sync, async ou none. sync String
autoOffsetReset consumer S'il n'y a pas de décalage initial ou si le décalage est hors plage, procédez comme suit: au plus tôt: réinitialise le décalage au tout premier décalage. Latest: réinitialise automatiquement le décalage au dernier (dernier) décalage. échec: lève une exception au consommateur. latest String
consumerRequestTimeoutMs consumer La configuration contrôle la durée maximale pendant laquelle un client attend une réponse à une demande. Si aucune réponse n'est reçue avant l'expiration du délai, le client renvoie la demande selon les besoins ou échoue la demande lorsque les tentatives sont épuisées. 40000 Integer
consumersCount consumer Spécifie le nombre de souches qui se connectent au serveur kafka. 1 int
consumerStreams consumer Nombre de consommateurs simultanés. 10 int
groupId consumer Spécifie une chaîne qui identifie le groupe auquel appartient ce consommateur. La définition du même ID de groupe indique que plusieurs consommateurs sont tous dans le même groupe de consommateurs. Il s'agit d'une option obligatoire pour le consommateur. String
maxPollRecords consumer poll()Spécifie le nombre maximal d'enregistrements renvoyés en un seul appel à. 500 Integer
pollTimeoutMs consumer La valeur du délai (en millisecondes) utilisée lors de l'interrogation du KafkaConsumer. 5000 Long
compressionCodec producer Ce paramètre vous permet de spécifier le codec de compression pour les données générées par le producteur. Les valeurs possibles sont none, gzip et snappy. Par défaut, il n'est pas compressé (aucun). none
requestRequiredAcks producer Fixez les conditions pour considérer que la demande du producteur est terminée. Vous spécifierez le niveau de fiabilité du message. Il y a les trois valeurs suivantes à spécifier. acques=S'il est à 0, le message sera perdu si le courtier du leader tombe en panne, donc accepte normalement=Spécifiez 1 ou tous. ・ Acks=0producer considère que la transmission est terminée sans attendre un accusé de réception du serveur. Dans ce cas, nous ne pouvons garantir que kafka a reçu le message. Le décalage renvoyé pour chaque enregistrement est toujours-Réglez sur 1. ・ Acks=1 Le lecteur écrit le message localement, mais répond sans attendre la fin des autres abonnés. Dans ce cas, si le lecteur tombe en panne immédiatement après avoir approuvé le message, les autres abonnés ne pourront pas le répliquer et le message sera perdu. ・ Acks=all Attend la fin de toutes les autres écritures de réplica synchrone avant que le lecteur ne renvoie une réponse. Cela garantit que les messages ne sont pas perdus tant qu'au moins un réplica synchrone est actif. 1 String
requestTimeoutMs producer Le courtier demande avant de renvoyer une erreur au client (producteur).required.Il est temps d'attendre avant de répondre aux exigences d'acks. 305000 Integer
retries producer S'il est défini sur une valeur supérieure à 0, le client renvoie (réessaie) les enregistrements qui n'ont pas pu être envoyés en raison d'une erreur temporaire. 0 Integer

référence

Envoyer et recevoir des messages simples à l'aide du composant Kafka du framework Apache Camel (ancien article)

Recommended Posts

Envoi et réception de messages simples à l'aide du composant Kafka du framework Apache Camel (édition Java DSL)
Gestion des transactions du framework intégré "Apache Camel"
Analyser et objectiver JSON à l'aide de l'annotation @JsonProperty de la bibliothèque Java Jackson
Afficher le calendrier et le jour japonais en utilisant la classe standard java8
Soyez prudent avec les demandes et les réponses lors de l'utilisation de Serverless Framework avec Java
[Java] La partie déroutante de String et StringBuilder
Installation simple de nginx et Docker à l'aide d'ansible
J'ai comparé les caractéristiques de Java et .NET