Micronaut Kafka
Micronaut has Apache Kafka support, so I decided to give it a try.
It seems that you can use Micronaut to create Apache Kafka Producer and Consumer.
The version of Micronaut used this time.
$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191
Micronaut 1.0.4 seems to support Apache Kafka 2.0.1.
Upgrade to Kafka 2.0.1
Along with this, I downloaded Apache Kafka 2.0.1.
$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1
Follow Apache Kafka's Quick Start to start Apache ZooKeeper and Apache Kafka Broker.
Quick Start / Start the server
## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties
Topic was created with the name my-topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".
Now let's create an application. Create a project for Producer and Consumer (here, Listener).
## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer
## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener
After that, we will add and edit the source code for these two projects.
Now let's write a Producer that sends a message to the Apache Kafka Broker.
$ cd hello-kafka-producer
Use the main
class as it is automatically generated.
package hello.kafka.producer;
import io.micronaut.runtime.Micronaut;
public class Application {
public static void main(String[] args) {
Producer seems to create an interface with the @KafkaClient
Created like this. There is no need to create an implementation class for the interface.
package hello.kafka.producer;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
Single<String> send(@KafkaKey String key, Single<String> message);
Settings are made with the @KafkaClient
annotation and the configuration file.
In the @KafkaClient
annotation, only the ACK was set.
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
Configuration file, here.
name: hello-kafka-producer
servers: localhost:9092
retries: 5
configures the connection settings for Apache Kafka Broker.
For the behavior of Producer, use kafka.producers. [Client-id]
By specifying ʻid, it is possible to set in units of
@ KafkaClient`.
Per @KafkaClient Producer Properties
This time, I didn't specify ʻid, so it's named
Send a message using the method that specifies the topic name in the @ Topic
Single<String> send(@KafkaKey String key, Single<String> message);
If you want to specify a key, use @KafkaKey
. If you do not use a key, you do not need to specify it (the key argument itself can be omitted).
You can also use types around Reactive Streams such as RxJava, so I used this one this time.
Reactive and Non-Blocking Method Definitions
Finally, @Controller
using @ KafkaClient
package hello.kafka.producer;
import javax.inject.Inject;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;
public class MessageController {
MessageClient messageClient;
@Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
public Single<String> message(String key, @Body Single<String> value) {
return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
$ ./mvnw package
This completes the Producer side.
Next, the Consumer side.
$ cd hello-kafka-listener
Kafka Consumers Using @KafkaListener
Consumers are created using the @KafkaListener
annotation. This is the class
package hello.kafka.listener;
import java.util.List;
import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
Set the Consumer with the @KafkaListener
annotation and specify the @ Topic
annotation for the method that receives the message.
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
The type to use is Reactive.
Receiving and returning Reactive Types
I also try to use ACK.
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
This time, the setting is only the connection destination to Broker. Also, I will start Producer on the same host, so set micronaut.server.port
to something other than 8080
name: hello-kafka-listener
port: 9080
servers: localhost:9092
This is also a build.
$ ./mvnw package
The Consumer side is now ready.
Let's check the operation.
##Launch Producer
$ java -jar target/hello-kafka-producer-0.1.jar
##Start Consumer
$ java -jar target/hello-kafka-listener-0.1.jar
I will try to throw a message into Producer as appropriate.
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended
On the Consumer side, the received message is displayed like this.
Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3
It seems that you could move it.
Recommended Posts