[JAVA] Access Apache Kafka with Micronaut

Micronaut Kafka

Micronaut has Apache Kafka support, so I decided to give it a try.

Kafka Support

It seems that you can use Micronaut to create Apache Kafka Producer and Consumer.

Micronaut Kafka

environment

The version of Micronaut used this time.

$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191

Micronaut 1.0.4 seems to support Apache Kafka 2.0.1.

Upgrade to Kafka 2.0.1

Micronaut Kafka

Along with this, I downloaded Apache Kafka 2.0.1.

$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1

Follow Apache Kafka's Quick Start to start Apache ZooKeeper and Apache Kafka Broker.

Quick Start / Start the server

## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties

## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties

Topic was created with the name my-topic.

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".

Make an application

Now let's create an application. Create a project for Producer and Consumer (here, Listener).

## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer

## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener

After that, we will add and edit the source code for these two projects.

Make a Producer

Now let's write a Producer that sends a message to the Apache Kafka Broker.

$ cd hello-kafka-producer

Use the main class as it is automatically generated.

src/main/java/hello/kafka/producer/Application.java

package hello.kafka.producer;

import io.micronaut.runtime.Micronaut;

public class Application {

    public static void main(String[] args) {
        Micronaut.run(Application.class);
    }
}

Producer seems to create an interface with the @KafkaClient annotation.

Defining @KafkaClient Methods

Created like this. There is no need to create an implementation class for the interface.

src/main/java/hello/kafka/producer/MessageClient.java

package hello.kafka.producer;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);
}

Settings are made with the @KafkaClient annotation and the configuration file.

In the @KafkaClient annotation, only the ACK was set.

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {

Configuration file, here.

src/main/resources/application.yml

---
micronaut:
    application:
        name: hello-kafka-producer

---
kafka:
    bootstrap:
        servers: localhost:9092
    producers:
        default:
            retries: 5

kafka.bootstrap.servers configures the connection settings for Apache Kafka Broker.

For the behavior of Producer, use kafka.producers. [Client-id].

By specifying ʻid, it is possible to set in units of @ KafkaClient`.

Per @KafkaClient Producer Properties

This time, I didn't specify ʻid, so it's named default`.

Send a message using the method that specifies the topic name in the @ Topic annotation.

    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);

If you want to specify a key, use @KafkaKey. If you do not use a key, you do not need to specify it (the key argument itself can be omitted).

You can also use types around Reactive Streams such as RxJava, so I used this one this time.

Reactive and Non-Blocking Method Definitions

Finally, @Controller using @ KafkaClient.

src/main/java/hello/kafka/producer/MessageController.java

package hello.kafka.producer;

import javax.inject.Inject;

import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;

@Controller("/message")
public class MessageController {
    @Inject
    MessageClient messageClient;

    @Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
    public Single<String> message(String key, @Body Single<String> value) {
        return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
    }
}

Build.

$ ./mvnw package

This completes the Producer side.

Listener(Consumer)

Next, the Consumer side.

$ cd hello-kafka-listener

Kafka Consumers Using @KafkaListener

Consumers are created using the @KafkaListener annotation. This is the class definition.

src/main/java/hello/kafka/listener/MessageListener.java

package hello.kafka.listener;

import java.util.List;

import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });
    }
}

Set the Consumer with the @KafkaListener annotation and specify the @ Topic annotation for the method that receives the message.

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {

The type to use is Reactive.

Receiving and returning Reactive Types

I also try to use ACK.

        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });

This time, the setting is only the connection destination to Broker. Also, I will start Producer on the same host, so set micronaut.server.port to something other than 8080.

src/main/resources/application.yml

---
micronaut:
    application:
        name: hello-kafka-listener
    server:
        port: 9080

---
kafka:
    bootstrap:
        servers: localhost:9092

This is also a build.

$ ./mvnw package

The Consumer side is now ready.

Try to move

Let's check the operation.

##Launch Producer
$ java -jar target/hello-kafka-producer-0.1.jar


##Start Consumer
$ java -jar target/hello-kafka-listener-0.1.jar

I will try to throw a message into Producer as appropriate.

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended

On the Consumer side, the received message is displayed like this.

Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3

It seems that you could move it.

Recommended Posts

Access Apache Kafka with Micronaut
Easy Pub/Sub messaging with Apache Kafka
Try DI with Micronaut
Hello World with Micronaut
Serverless Function with Micronaut
Until you try running Apache Kafka with docker image
Message cooperation started with Spring Boot Apache Kafka edition
Repeated sample with Apache Freemarker
Start Apache Solr with Embedded.
CSV output with Apache Commons CSV
Manipulate Excel with Apache POI
Study Flilnk with Kafka Exercise Code
Write a Reactive server with Micronaut
Circuit Breaker Pattern with Apache Camel
Download large files with Apache JMeter
Add Bean Validation with Micronaut (Java)
DB access using repository with SpringData.
Restart apache with docker php-apache image
FileUpload with Rest on Apache Wicket
Easy database access with Java Sql2o
[Linux] Start Apache container with Docker