[JAVA] Greifen Sie mit Micronaut auf Apache Kafka zu

Micronaut Kafka

Micronaut hat Unterstützung für Apache Kafka, deshalb habe ich beschlossen, es auszuprobieren.

Kafka Support

Es scheint, dass Micronaut verwendet werden kann, um Apache Kafka Produzenten und Konsumenten zu erschaffen.

Micronaut Kafka

Umgebung

Die diesmal verwendete Version von Micronaut.

$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191

Micronaut 1.0.4 scheint Apache Kafka 2.0.1 zu unterstützen.

Upgrade to Kafka 2.0.1

Micronaut Kafka

Zusammen mit diesem habe ich Apache Kafka 2.0.1 heruntergeladen.

$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1

Folgen Sie dem Schnellstart von Apache Kafka, um Apache ZooKeeper und Apache Kafka Broker zu starten.

Quick Start / Start the server

## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties

## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties

Das Thema wurde mit dem Namen "Mein Thema" erstellt.

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".

Sich bewerben

Jetzt erstellen wir eine Anwendung. Erstellen Sie ein Projekt für Produzent und Konsument (hier Listener).

## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer

## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener

Danach werden wir den Quellcode für diese beiden Projekte hinzufügen und bearbeiten.

Machen Sie einen Produzenten

Schreiben wir nun einen Produzenten, der eine Nachricht an den Apache Kafka Broker sendet.

$ cd hello-kafka-producer

Verwenden Sie die Hauptklasse, da sie automatisch generiert wird.

src/main/java/hello/kafka/producer/Application.java

package hello.kafka.producer;

import io.micronaut.runtime.Micronaut;

public class Application {

    public static void main(String[] args) {
        Micronaut.run(Application.class);
    }
}

Der Produzent scheint eine Schnittstelle mit der Annotation "@ KafkaClient" zu erstellen.

Defining @KafkaClient Methods

So erstellt. Es ist nicht erforderlich, eine Implementierungsklasse für die Schnittstelle zu erstellen.

src/main/java/hello/kafka/producer/MessageClient.java

package hello.kafka.producer;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);
}

Die Einstellungen werden mithilfe der Anmerkung "@ KafkaClient" und der Einstellungsdatei vorgenommen.

In der Annotation "@ KafkaClient" wurde nur die ACK festgelegt.

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {

Datei hier einstellen.

src/main/resources/application.yml

---
micronaut:
    application:
        name: hello-kafka-producer

---
kafka:
    bootstrap:
        servers: localhost:9092
    producers:
        default:
            retries: 5

kafka.bootstrap.servers konfiguriert die Verbindungseinstellungen für Apache Kafka Broker.

Verwenden Sie für das Verhalten von Producer kafka.producers. [Client-id].

Durch Angabe von "id" ist es möglich, Einheiten von "@ KafkaClient" festzulegen.

Per @KafkaClient Producer Properties

Dieses Mal habe ich nicht "id" angegeben, daher heißt es "default".

Senden Sie eine Nachricht mit der Methode mit dem in der Anmerkung "@ Topic" angegebenen Themennamen.

    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);

Wenn Sie einen Schlüssel angeben möchten, verwenden Sie "@ KafkaKey". Wenn Sie keinen Schlüssel verwenden, müssen Sie ihn nicht angeben (das Schlüsselargument selbst kann weggelassen werden).

Sie können auch Typen für reaktive Streams wie RxJava verwenden, daher habe ich diesen diesmal verwendet.

Reactive and Non-Blocking Method Definitions

Schließlich "@ Controller" mit "@ KafkaClient".

src/main/java/hello/kafka/producer/MessageController.java

package hello.kafka.producer;

import javax.inject.Inject;

import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;

@Controller("/message")
public class MessageController {
    @Inject
    MessageClient messageClient;

    @Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
    public Single<String> message(String key, @Body Single<String> value) {
        return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
    }
}

Bauen.

$ ./mvnw package

Dies vervollständigt die Produzentenseite.

Listener(Consumer)

Als nächstes die Verbraucherseite.

$ cd hello-kafka-listener

Kafka Consumers Using @KafkaListener

Verbraucher werden mit der Annotation @ KafkaListener erstellt. Dies ist die "Klassen" -Definition.

src/main/java/hello/kafka/listener/MessageListener.java

package hello.kafka.listener;

import java.util.List;

import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });
    }
}

Stellen Sie den Consumer mit der Annotation "@ KafkaListener" ein und geben Sie die Annotation "@ Topic" für die Methode an, die die Nachricht empfängt.

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {

Der zu verwendende Typ ist Reaktiv.

Receiving and returning Reactive Types

Ich versuche auch, ACK zu verwenden.

        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });

Diesmal ist die Einstellung nur das Verbindungsziel zum Broker. Außerdem werde ich Producer auf demselben Host starten, also setze micronaut.server.port auf etwas anderes als 8080.

src/main/resources/application.yml

---
micronaut:
    application:
        name: hello-kafka-listener
    server:
        port: 9080

---
kafka:
    bootstrap:
        servers: localhost:9092

Dies ist auch ein Build.

$ ./mvnw package

Die Verbraucherseite ist jetzt bereit.

Versuche dich zu bewegen

Lassen Sie uns die Operation überprüfen.

##Producer starten
$ java -jar target/hello-kafka-producer-0.1.jar


##Starten Sie Consumer
$ java -jar target/hello-kafka-listener-0.1.jar

Ich werde versuchen, eine Nachricht entsprechend an Producer zu senden.

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended

Auf der Verbraucherseite wird die empfangene Nachricht folgendermaßen angezeigt.

Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3

Es scheint, dass Sie es bewegen könnten.

Recommended Posts

Greifen Sie mit Micronaut auf Apache Kafka zu
Versuchen Sie DI mit Micronaut
Hallo Welt mit Micronaut
Serverlose Funktion mit Micronaut
Bis ich versuche, Apache Kafka mit Docker-Image auszuführen
Die Nachrichtenkooperation begann mit der Spring Boot Apache Kafka Edition
Wiederholte Probe mit Apache Freemarker
Starten Sie Apache Solr mit Embedded.
Bearbeiten Sie Excel mit Apache POI
Studiere Flilnk mit dem Kafka-Übungscode
Schreiben Sie einen reaktiven Server mit Micronaut
Leistungsschaltermuster mit Apache-Kamel
Laden Sie große Dateien mit Apache JMeter herunter
Bean Validation mit Micronaut (Java) hinzufügen
Greifen Sie über das Repository mit Spring Data auf die Datenbank zu.
Starten Sie Apache mit dem PHP-Apache-Image des Dockers neu
FileUpload mit Rest auf Apache Wicket
Einfacher Datenbankzugriff mit Java Sql2o
[Linux] Starten Sie den Apache-Container mit Docker