Micronaut Kafka
Micronaut hat Unterstützung für Apache Kafka, deshalb habe ich beschlossen, es auszuprobieren.
Es scheint, dass Micronaut verwendet werden kann, um Apache Kafka Produzenten und Konsumenten zu erschaffen.
Die diesmal verwendete Version von Micronaut.
$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191
Micronaut 1.0.4 scheint Apache Kafka 2.0.1 zu unterstützen.
Upgrade to Kafka 2.0.1
Zusammen mit diesem habe ich Apache Kafka 2.0.1 heruntergeladen.
$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1
Folgen Sie dem Schnellstart von Apache Kafka, um Apache ZooKeeper und Apache Kafka Broker zu starten.
Quick Start / Start the server
## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties
Das Thema wurde mit dem Namen "Mein Thema" erstellt.
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".
Jetzt erstellen wir eine Anwendung. Erstellen Sie ein Projekt für Produzent und Konsument (hier Listener).
## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer
## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener
Danach werden wir den Quellcode für diese beiden Projekte hinzufügen und bearbeiten.
Schreiben wir nun einen Produzenten, der eine Nachricht an den Apache Kafka Broker sendet.
$ cd hello-kafka-producer
Verwenden Sie die Hauptklasse, da sie automatisch generiert wird.
src/main/java/hello/kafka/producer/Application.java
package hello.kafka.producer;
import io.micronaut.runtime.Micronaut;
public class Application {
public static void main(String[] args) {
Micronaut.run(Application.class);
}
}
Der Produzent scheint eine Schnittstelle mit der Annotation "@ KafkaClient" zu erstellen.
So erstellt. Es ist nicht erforderlich, eine Implementierungsklasse für die Schnittstelle zu erstellen.
src/main/java/hello/kafka/producer/MessageClient.java
package hello.kafka.producer;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
@Topic("my-topic")
Single<String> send(@KafkaKey String key, Single<String> message);
}
Die Einstellungen werden mithilfe der Anmerkung "@ KafkaClient" und der Einstellungsdatei vorgenommen.
In der Annotation "@ KafkaClient" wurde nur die ACK festgelegt.
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
Datei hier einstellen.
src/main/resources/application.yml
---
micronaut:
application:
name: hello-kafka-producer
---
kafka:
bootstrap:
servers: localhost:9092
producers:
default:
retries: 5
kafka.bootstrap.servers
konfiguriert die Verbindungseinstellungen für Apache Kafka Broker.
Verwenden Sie für das Verhalten von Producer kafka.producers. [Client-id]
.
Durch Angabe von "id" ist es möglich, Einheiten von "@ KafkaClient" festzulegen.
Per @KafkaClient Producer Properties
Dieses Mal habe ich nicht "id" angegeben, daher heißt es "default".
Senden Sie eine Nachricht mit der Methode mit dem in der Anmerkung "@ Topic" angegebenen Themennamen.
@Topic("my-topic")
Single<String> send(@KafkaKey String key, Single<String> message);
Wenn Sie einen Schlüssel angeben möchten, verwenden Sie "@ KafkaKey". Wenn Sie keinen Schlüssel verwenden, müssen Sie ihn nicht angeben (das Schlüsselargument selbst kann weggelassen werden).
Sie können auch Typen für reaktive Streams wie RxJava verwenden, daher habe ich diesen diesmal verwendet.
Reactive and Non-Blocking Method Definitions
Schließlich "@ Controller" mit "@ KafkaClient".
src/main/java/hello/kafka/producer/MessageController.java
package hello.kafka.producer;
import javax.inject.Inject;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;
@Controller("/message")
public class MessageController {
@Inject
MessageClient messageClient;
@Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
public Single<String> message(String key, @Body Single<String> value) {
return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
}
}
Bauen.
$ ./mvnw package
Dies vervollständigt die Produzentenseite.
Listener(Consumer)
Als nächstes die Verbraucherseite.
$ cd hello-kafka-listener
Kafka Consumers Using @KafkaListener
Verbraucher werden mit der Annotation @ KafkaListener
erstellt. Dies ist die "Klassen" -Definition.
src/main/java/hello/kafka/listener/MessageListener.java
package hello.kafka.listener;
import java.util.List;
import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
@Topic("my-topic")
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
acknowledgement.ack();
});
}
}
Stellen Sie den Consumer mit der Annotation "@ KafkaListener" ein und geben Sie die Annotation "@ Topic" für die Methode an, die die Nachricht empfängt.
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
@Topic("my-topic")
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
Der zu verwendende Typ ist Reaktiv.
Receiving and returning Reactive Types
Ich versuche auch, ACK zu verwenden.
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
acknowledgement.ack();
});
Diesmal ist die Einstellung nur das Verbindungsziel zum Broker. Außerdem werde ich Producer auf demselben Host starten, also setze micronaut.server.port
auf etwas anderes als 8080
.
src/main/resources/application.yml
---
micronaut:
application:
name: hello-kafka-listener
server:
port: 9080
---
kafka:
bootstrap:
servers: localhost:9092
Dies ist auch ein Build.
$ ./mvnw package
Die Verbraucherseite ist jetzt bereit.
Lassen Sie uns die Operation überprüfen.
##Producer starten
$ java -jar target/hello-kafka-producer-0.1.jar
##Starten Sie Consumer
$ java -jar target/hello-kafka-listener-0.1.jar
Ich werde versuchen, eine Nachricht entsprechend an Producer zu senden.
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended
Auf der Verbraucherseite wird die empfangene Nachricht folgendermaßen angezeigt.
Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3
Es scheint, dass Sie es bewegen könnten.
Recommended Posts