[JAVA] Accédez à Apache Kafka avec Micronaut

Micronaut Kafka

Micronaut supporte Apache Kafka, j'ai donc décidé de l'essayer.

Kafka Support

Il semble que Micronaut puisse être utilisé pour créer des producteurs et des consommateurs Apache Kafka.

Micronaut Kafka

environnement

La version de Micronaut utilisée cette fois.

$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191

Micronaut 1.0.4 semble prendre en charge Apache Kafka 2.0.1.

Upgrade to Kafka 2.0.1

Micronaut Kafka

Parallèlement à cela, j'ai téléchargé Apache Kafka 2.0.1.

$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1

Suivez le démarrage rapide d'Apache Kafka pour lancer Apache ZooKeeper et Apache Kafka Broker.

Quick Start / Start the server

## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties

## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties

Le sujet a été créé avec le nom «my-topic».

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".

Faire une demande

Créons maintenant une application. Créez un projet pour le producteur et le consommateur (ici, Auditeur).

## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer

## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener

Après cela, nous ajouterons et modifierons le code source de ces deux projets.

Faire un producteur

Écrivons maintenant un producteur qui envoie un message au courtier Apache Kafka.

$ cd hello-kafka-producer

Utilisez la classe main telle qu'elle est générée automatiquement.

src/main/java/hello/kafka/producer/Application.java

package hello.kafka.producer;

import io.micronaut.runtime.Micronaut;

public class Application {

    public static void main(String[] args) {
        Micronaut.run(Application.class);
    }
}

Le producteur semble créer une interface avec l'annotation @ KafkaClient.

Defining @KafkaClient Methods

Créé comme ça. Il n'est pas nécessaire de créer une classe d'implémentation pour l'interface.

src/main/java/hello/kafka/producer/MessageClient.java

package hello.kafka.producer;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);
}

Les réglages sont effectués à l'aide de l'annotation @ KafkaClient et du fichier de paramètres.

Dans l'annotation @ KafkaClient, seul l'ACK a été défini.

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {

Fichier de configuration, ici.

src/main/resources/application.yml

---
micronaut:
    application:
        name: hello-kafka-producer

---
kafka:
    bootstrap:
        servers: localhost:9092
    producers:
        default:
            retries: 5

kafka.bootstrap.servers configure les paramètres de connexion pour Apache Kafka Broker.

Pour le comportement de Producer, utilisez kafka.producers. [Client-id].

En spécifiant ʻid, il est possible de définir en unités de @ KafkaClient`.

Per @KafkaClient Producer Properties

Cette fois, je n'ai pas spécifié «id», donc il est nommé «default».

Envoyez un message en utilisant la méthode avec le nom du sujet spécifié dans l'annotation @ Topic.

    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);

Si vous souhaitez spécifier une clé, utilisez @ KafkaKey. Si vous n'utilisez pas de clé, vous n'avez pas besoin de la spécifier (l'argument clé lui-même peut être omis).

Vous pouvez également utiliser des types autour des flux réactifs tels que RxJava, j'ai donc utilisé celui-ci cette fois.

Reactive and Non-Blocking Method Definitions

Enfin, @ Controller en utilisant @ KafkaClient.

src/main/java/hello/kafka/producer/MessageController.java

package hello.kafka.producer;

import javax.inject.Inject;

import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;

@Controller("/message")
public class MessageController {
    @Inject
    MessageClient messageClient;

    @Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
    public Single<String> message(String key, @Body Single<String> value) {
        return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
    }
}

Construire.

$ ./mvnw package

Ceci complète le côté producteur.

Listener(Consumer)

Ensuite, le côté consommateur.

$ cd hello-kafka-listener

Kafka Consumers Using @KafkaListener

Les consommateurs sont créés à l'aide de l'annotation @ KafkaListener. C'est la définition de la classe.

src/main/java/hello/kafka/listener/MessageListener.java

package hello.kafka.listener;

import java.util.List;

import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });
    }
}

Définissez le consommateur avec l'annotation @ KafkaListener et spécifiez l'annotation @ Topic pour la méthode qui reçoit le message.

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {

Le type à utiliser est Réactif.

Receiving and returning Reactive Types

J'essaye également d'utiliser ACK.

        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });

Cette fois, le paramètre est uniquement la destination de la connexion à Broker. De plus, je vais démarrer Producer sur le même hôte, alors définissez micronaut.server.port sur autre chose que 8080.

src/main/resources/application.yml

---
micronaut:
    application:
        name: hello-kafka-listener
    server:
        port: 9080

---
kafka:
    bootstrap:
        servers: localhost:9092

C'est aussi une construction.

$ ./mvnw package

Le côté consommateur est maintenant prêt.

Essayez de bouger

Vérifions l'opération.

##Lancer le producteur
$ java -jar target/hello-kafka-producer-0.1.jar


##Commencer le consommateur
$ java -jar target/hello-kafka-listener-0.1.jar

J'essaierai de lancer un message à Producer, le cas échéant.

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended

Du côté du consommateur, le message reçu est affiché comme ceci.

Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3

Il semble que vous puissiez le déplacer.

Recommended Posts

Accédez à Apache Kafka avec Micronaut
Essayez DI avec Micronaut
Hello World avec Micronaut
Fonction sans serveur avec Micronaut
Jusqu'à ce que j'essaye d'exécuter Apache Kafka avec une image docker
La coopération des messages a commencé avec l'édition Spring Boot Apache Kafka
Échantillon répété avec Apache Freemarker
Démarrez Apache Solr avec Embedded.
Sortie CSV par Apache Commons CSV
Manipuler Excel avec Apache POI
Étudiez Flilnk avec le code d'exercice Kafka
Ecrire un serveur réactif avec Micronaut
Modèle de disjoncteur avec Apache Camel
Téléchargez des fichiers volumineux avec Apache JMeter
Ajouter une validation de bean avec Micronaut (Java)
Accédez à la base de données en utilisant le référentiel avec Spring Data.
Redémarrez Apache avec l'image php-apache de Docker
FileUpload avec Rest sur Apache Wicket
Accès facile à la base de données avec Java Sql2o
[Linux] Démarrer le conteneur Apache avec Docker