Micronaut Kafka
Micronaut supporte Apache Kafka, j'ai donc décidé de l'essayer.
Il semble que Micronaut puisse être utilisé pour créer des producteurs et des consommateurs Apache Kafka.
La version de Micronaut utilisée cette fois.
$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191
Micronaut 1.0.4 semble prendre en charge Apache Kafka 2.0.1.
Upgrade to Kafka 2.0.1
Parallèlement à cela, j'ai téléchargé Apache Kafka 2.0.1.
$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1
Suivez le démarrage rapide d'Apache Kafka pour lancer Apache ZooKeeper et Apache Kafka Broker.
Quick Start / Start the server
## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties
Le sujet a été créé avec le nom «my-topic».
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".
Créons maintenant une application. Créez un projet pour le producteur et le consommateur (ici, Auditeur).
## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer
## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener
Après cela, nous ajouterons et modifierons le code source de ces deux projets.
Écrivons maintenant un producteur qui envoie un message au courtier Apache Kafka.
$ cd hello-kafka-producer
Utilisez la classe main
telle qu'elle est générée automatiquement.
src/main/java/hello/kafka/producer/Application.java
package hello.kafka.producer;
import io.micronaut.runtime.Micronaut;
public class Application {
public static void main(String[] args) {
Micronaut.run(Application.class);
}
}
Le producteur semble créer une interface avec l'annotation @ KafkaClient
.
Créé comme ça. Il n'est pas nécessaire de créer une classe d'implémentation pour l'interface.
src/main/java/hello/kafka/producer/MessageClient.java
package hello.kafka.producer;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
@Topic("my-topic")
Single<String> send(@KafkaKey String key, Single<String> message);
}
Les réglages sont effectués à l'aide de l'annotation @ KafkaClient
et du fichier de paramètres.
Dans l'annotation @ KafkaClient
, seul l'ACK a été défini.
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
Fichier de configuration, ici.
src/main/resources/application.yml
---
micronaut:
application:
name: hello-kafka-producer
---
kafka:
bootstrap:
servers: localhost:9092
producers:
default:
retries: 5
kafka.bootstrap.servers
configure les paramètres de connexion pour Apache Kafka Broker.
Pour le comportement de Producer, utilisez kafka.producers. [Client-id]
.
En spécifiant ʻid, il est possible de définir en unités de
@ KafkaClient`.
Per @KafkaClient Producer Properties
Cette fois, je n'ai pas spécifié «id», donc il est nommé «default».
Envoyez un message en utilisant la méthode avec le nom du sujet spécifié dans l'annotation @ Topic
.
@Topic("my-topic")
Single<String> send(@KafkaKey String key, Single<String> message);
Si vous souhaitez spécifier une clé, utilisez @ KafkaKey
. Si vous n'utilisez pas de clé, vous n'avez pas besoin de la spécifier (l'argument clé lui-même peut être omis).
Vous pouvez également utiliser des types autour des flux réactifs tels que RxJava, j'ai donc utilisé celui-ci cette fois.
Reactive and Non-Blocking Method Definitions
Enfin, @ Controller
en utilisant @ KafkaClient
.
src/main/java/hello/kafka/producer/MessageController.java
package hello.kafka.producer;
import javax.inject.Inject;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;
@Controller("/message")
public class MessageController {
@Inject
MessageClient messageClient;
@Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
public Single<String> message(String key, @Body Single<String> value) {
return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
}
}
Construire.
$ ./mvnw package
Ceci complète le côté producteur.
Listener(Consumer)
Ensuite, le côté consommateur.
$ cd hello-kafka-listener
Kafka Consumers Using @KafkaListener
Les consommateurs sont créés à l'aide de l'annotation @ KafkaListener
. C'est la définition de la classe
.
src/main/java/hello/kafka/listener/MessageListener.java
package hello.kafka.listener;
import java.util.List;
import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
@Topic("my-topic")
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
acknowledgement.ack();
});
}
}
Définissez le consommateur avec l'annotation @ KafkaListener
et spécifiez l'annotation @ Topic
pour la méthode qui reçoit le message.
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
@Topic("my-topic")
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
Le type à utiliser est Réactif.
Receiving and returning Reactive Types
J'essaye également d'utiliser ACK.
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
acknowledgement.ack();
});
Cette fois, le paramètre est uniquement la destination de la connexion à Broker. De plus, je vais démarrer Producer sur le même hôte, alors définissez micronaut.server.port
sur autre chose que 8080
.
src/main/resources/application.yml
---
micronaut:
application:
name: hello-kafka-listener
server:
port: 9080
---
kafka:
bootstrap:
servers: localhost:9092
C'est aussi une construction.
$ ./mvnw package
Le côté consommateur est maintenant prêt.
Vérifions l'opération.
##Lancer le producteur
$ java -jar target/hello-kafka-producer-0.1.jar
##Commencer le consommateur
$ java -jar target/hello-kafka-listener-0.1.jar
J'essaierai de lancer un message à Producer, le cas échéant.
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended
Du côté du consommateur, le message reçu est affiché comme ceci.
Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3
Il semble que vous puissiez le déplacer.
Recommended Posts