Apache kafka est un système de messagerie distribué open source qui peut publier / souscrire un grand nombre de messages à haute vitesse. Ici, nous allons créer une application qui échange des messages avec kafka en utilisant Apache Camel, qui est un framework intégré. Utilisez le composant camel-kafka pour vous connecter à kafka avec Apache Camel.
Dans cet environnement, kafka utilise la v2.0.0. Reportez-vous à l'article suivant pour savoir comment créer l'environnement.
De plus, j'ai déjà écrit une application similaire en XML DSL dans le prochain article, mais cette fois je vais créer une application utilisant Java DSL. Aussi, je voudrais écrire plus en détail que la dernière fois.
L'application créée cette fois publie le message de date et d'heure sur kafka toutes les secondes comme indiqué dans la figure ci-dessous, et souscrit le même message de kafka.
Avec Apache Camel, une telle application utilisant kafka peut être implémentée avec seulement quelques dizaines de lignes de code ci-dessous. Ce n'est pas joli du code car j'ai mis tout le code dans la fonction principale, mais j'espère que vous pouvez voir qu'il peut être facilement implémenté par Camel.
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
KafkaComponent kafka = new KafkaComponent();
kafka.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // kafka1, kafka2,Enregistrer 3 serveurs de kafka3 en tant que courtiers
context.addComponent("kafka", kafka);
context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:trigger?period=1000") //Exécuter toutes les 1000 millisecondes
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Réglez la date et l'heure actuelles du CORPS du message
.to("kafka:test01"); //Publier le message dans le sujet test01
from("kafka:test01") //Abonnez-vous au message du sujet test01
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Afficher le contenu BODY du message dans le journal
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
Créons maintenant une application kafka.
Pour Maven, ajoutez la bibliothèque suivante à pom.xml. camel-kafka est un composant pour gérer Kafka dans Camel, et $ {camel.version} spécifie la version de camel que vous utilisez. kafka-clients est une bibliothèque cliente pour Kafka.
pom.xml
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
L'URI pour utiliser le composant camel-kafka ressemble à ceci:
kafka:topic[?options]
Spécifiez le nom du sujet cible dans le chemin du contexte (sujet).
Créez un itinéraire pour publier le message sur kafka. La route écrite en Java DSL est la suivante.
Le sujet est créé automatiquement, mais si vous souhaitez modifier les paramètres du sujet, créez-le manuellement. Le code est trop simple à expliquer.
from("timer:trigger?period=1000") //Exécuter toutes les 1000 millisecondes
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Réglez la date et l'heure actuelles du CORPS du message
.to("kafka:test01"); //Publier le message dans le sujet test01
Ensuite, créez une route qui abonne le message à kafka. La route écrite en Java DSL est la suivante.
from("kafka:test01") //Abonnez-vous au message du sujet test01
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Afficher le contenu BODY du message dans le journal
Puisqu'il n'est pas possible de se connecter à kafka avec cela seul, écrivez les paramètres de connexion. Tout d'abord, créez une instance de Kafka Configuration pour enregistrer les paramètres kafka et configurez les paramètres pour vous connecter à kafka. Il existe différents éléments de réglage, et je voudrais les expliquer brièvement plus tard. Dans l'exemple ci-dessous, certains paramètres sont définis, mais si vous souhaitez simplement vous connecter, configurez simplement le courtier auquel se connecter avec la méthode setBrokers. À part cela, la valeur par défaut est utilisée et vous n'avez pas besoin de la définir.
KafkaConfiguration kafkaConfig = new KafkaConfiguration();
kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Connectez-vous à 3 courtiers Kafka
kafkaConfig.setGroupId("group1"); //Définir l'ID du groupe de consommateurs
kafkaConfig.setAutoCommitEnable(true); //Activer l'autocommit
kafkaConfig.setAutoCommitIntervalMs(5000); //Définir l'intervalle de validation automatique
kafkaConfig.setAutoOffsetReset("earliest"); //Comportement lors de l'abonnement à une partition sans décalage engagé
kafkaConfig.setRequestRequiredAcks("all"); //Conditions pour juger que la publication a réussi
kafkaConfig.setConsumersCount(1); //Nombre de consommateurs
Ensuite, créez une instance de KafkaComponent et définissez l'instance de KafkaConfiguration créée précédemment.
KafkaComponent kafka = new KafkaComponent();
kafka.setConfiguration(kafkaConfig);
C'est le seul paramètre de connexion à kafka.
Cela termine les itinéraires de l'éditeur et du consommateur et les paramètres de connexion kafka. La source entière qui a créé la fonction principale etc. qui les utilise est la suivante.
package example.camelbegginer.kafka;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.impl.DefaultCamelContext;
public class KafkaExample {
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
context.addComponent("kafka", createKafkaComponent());
context.addRoutes(createProducerRouteBuilder());
context.addRoutes(createConsumerRouteBuilder());
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
static KafkaComponent createKafkaComponent() {
KafkaConfiguration kafkaConfig = new KafkaConfiguration();
kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Connectez-vous à 3 courtiers Kafka
kafkaConfig.setGroupId("group1"); //Définir l'ID de groupe
kafkaConfig.setAutoCommitEnable(true); //Activer l'autocommit
kafkaConfig.setAutoCommitIntervalMs(5000); //Définir l'intervalle de validation automatique
kafkaConfig.setAutoOffsetReset("earliest"); //Comportement lors de la lecture d'une partition sans décalage engagé
kafkaConfig.setRequestRequiredAcks("all"); //Conditions pour juger que la publication a réussi
kafkaConfig.setConsumersCount(1); //Nombre de consommateurs
KafkaComponent kafka = new KafkaComponent();
kafka.setConfiguration(kafkaConfig);
return kafka;
}
static RouteBuilder createProducerRouteBuilder() {
return new RouteBuilder() {
public void configure() throws Exception {
from("timer:trigger?period=1000") //Exécuter toutes les 1000 millisecondes
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Réglez la date et l'heure actuelles du CORPS du message
.to("kafka:test01"); //Publier le message dans le sujet test01
}
};
}
static RouteBuilder createConsumerRouteBuilder() {
return new RouteBuilder() {
public void configure() throws Exception {
from("kafka:test01") //Abonnez-vous au message du sujet test01
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Afficher le contenu BODY du message dans le journal
}
};
}
}
Lorsque cette application est exécutée, le journal est sorti toutes les secondes comme indiqué ci-dessous.
[2019-01-31 21:15:38.112], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:38
[2019-01-31 21:15:39.031], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:39
[2019-01-31 21:15:40.034], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:40
[2019-01-31 21:15:41.026], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:41
[2019-01-31 21:15:42.029], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:42
[2019-01-31 21:15:43.024], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:43
[2019-01-31 21:15:44.044], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:44
[2019-01-31 21:15:45.028], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:45
[2019-01-31 21:15:46.032], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:46
En outre, si vous sortez l'en-tête avec le code suivant, le décalage du message et les informations de partition seront affichés dans le journal.
from("kafka:test01") //Abonnez-vous au message du sujet test01
.routeId("kafka_consumer_route")
.log("body = ${body}") //Afficher le contenu BODY du message dans le journal
.log("${headers}"); //★ Ajouter ici
Le journal de sortie est le suivant.
2019-02-01 10:10:46.236], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, {breadcrumbId=[B@58a2fc46, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = breadcrumbId, value = [73, 68, 45, 109, 107, 121, 45, 80, 67, 45, 49, 53, 52, 56, 57, 56, 51, 52, 52, 51, 56, 55, 51, 45, 48, 45, 49])], isReadOnly = false), kafka.OFFSET=52, kafka.PARTITION=0, kafka.TIMESTAMP=1548983446195, kafka.TOPIC=test01}
De cette façon, vous pouvez utiliser le composant Kafka du framework Apache Camel pour créer une application qui envoie et reçoit des messages simples. Avez-vous compris que l'utilisation de Camel facilite la connexion avec kafka?
Enfin, j'expliquerai les principales propriétés du composant camel-kafka. Il existe de nombreuses propriétés autres que le tableau ci-dessous, veuillez vous référer à la page officielle pour plus de détails.
Il existe de nombreuses propriétés, mais certaines sont les mêmes que celles utilisées par le client kafka, vous pouvez donc les utiliser sans aucune gêne.
Nom de la propriété | producer/consumer | La description | Valeur par défaut | Moule |
---|---|---|---|---|
brokers | Commun | Spécifiez le courtier Kafka. Le format est host1:Lors de la spécification de plusieurs courtiers sur le port1, hôte1:port1,host2:Séparé de port2 et virgule. | String | |
clientId | Commun | L'ID client est une chaîne spécifiée par l'utilisateur pour le suivi des appels depuis l'application du client. | String | |
autoCommitEnable | consumer | S'il est défini sur true, le consommateur validera automatiquement le décalage du message récupéré sur une base régulière. L'intervalle de validation automatique est spécifié avec l'option autoCommitIntervalMs. | TRUE | Boolean |
autoCommitIntervalMs | consumer | Spécifie l'intervalle (en millisecondes) auquel le consommateur valide automatiquement le décalage du message. | 5000 | Integer |
autoCommitOnStop | consumer | Spécifie s'il faut valider automatiquement automatiquement le dernier message abonné du courtier lorsque le consommateur tombe en panne. Cela nécessite que l'option autoCommitEnable soit activée. Les valeurs possibles sont sync, async ou none. | sync | String |
autoOffsetReset | consumer | S'il n'y a pas de décalage initial ou si le décalage est hors plage, procédez comme suit: au plus tôt: réinitialise le décalage au tout premier décalage. Latest: réinitialise automatiquement le décalage au dernier (dernier) décalage. échec: lève une exception au consommateur. | latest | String |
consumerRequestTimeoutMs | consumer | La configuration contrôle la durée maximale pendant laquelle un client attend une réponse à une demande. Si aucune réponse n'est reçue avant l'expiration du délai, le client renvoie la demande selon les besoins ou échoue la demande lorsque les tentatives sont épuisées. | 40000 | Integer |
consumersCount | consumer | Spécifie le nombre de souches qui se connectent au serveur kafka. | 1 | int |
consumerStreams | consumer | Nombre de consommateurs simultanés. | 10 | int |
groupId | consumer | Spécifie une chaîne qui identifie le groupe auquel appartient ce consommateur. La définition du même ID de groupe indique que plusieurs consommateurs sont tous dans le même groupe de consommateurs. Il s'agit d'une option obligatoire pour le consommateur. | String | |
maxPollRecords | consumer | poll()Spécifie le nombre maximal d'enregistrements renvoyés en un seul appel à. | 500 | Integer |
pollTimeoutMs | consumer | La valeur du délai (en millisecondes) utilisée lors de l'interrogation du KafkaConsumer. | 5000 | Long |
compressionCodec | producer | Ce paramètre vous permet de spécifier le codec de compression pour les données générées par le producteur. Les valeurs possibles sont none, gzip et snappy. Par défaut, il n'est pas compressé (aucun). | none | |
requestRequiredAcks | producer | Fixez les conditions pour considérer que la demande du producteur est terminée. Vous spécifierez le niveau de fiabilité du message. Il y a les trois valeurs suivantes à spécifier. acques=S'il est à 0, le message sera perdu si le courtier du leader tombe en panne, donc accepte normalement=Spécifiez 1 ou tous. ・ Acks=0producer considère que la transmission est terminée sans attendre un accusé de réception du serveur. Dans ce cas, nous ne pouvons garantir que kafka a reçu le message. Le décalage renvoyé pour chaque enregistrement est toujours-Réglez sur 1. ・ Acks=1 Le lecteur écrit le message localement, mais répond sans attendre la fin des autres abonnés. Dans ce cas, si le lecteur tombe en panne immédiatement après avoir approuvé le message, les autres abonnés ne pourront pas le répliquer et le message sera perdu. ・ Acks=all Attend la fin de toutes les autres écritures de réplica synchrone avant que le lecteur ne renvoie une réponse. Cela garantit que les messages ne sont pas perdus tant qu'au moins un réplica synchrone est actif. | 1 | String |
requestTimeoutMs | producer | Le courtier demande avant de renvoyer une erreur au client (producteur).required.Il est temps d'attendre avant de répondre aux exigences d'acks. | 305000 | Integer |
retries | producer | S'il est défini sur une valeur supérieure à 0, le client renvoie (réessaie) les enregistrements qui n'ont pas pu être envoyés en raison d'une erreur temporaire. | 0 | Integer |
Recommended Posts