Send and receive simple messages using the Kafka component of the Apache Camel framework (Java DSL)

Introduction

Apache kafka is an open source distributed messaging system that allows you to publish / subscribe large numbers of messages at high speed. Here, we will create an application that exchanges messages with kafka using Apache Camel, which is an integrated framework. Use the camel-kafka component to connect to kafka with Apache Camel.

In this environment, kafka uses v2.0.0. Please refer to the following article for how to build the environment.

-Apacke kafka v2.0.0 has been released, so try installing it

Also, I wrote a similar application in XML DSL before in the next article, but this time I will create an application using Java DSL. Also, I would like to write in more detail than last time.

-Send and receive simple messages using the Kafka component of the Apache Camel framework

Description of the application to create

The application created this time publishes the date and time message to kafka every second as shown in the figure below, and subscribes the same message from kafka.

image.png

With Apache Camel, you can implement such an application using kafka with just a few dozen lines of code below. It's not pretty code because I put all the code in the main function, but I hope you can see that it can be easily implemented by Camel.

	public static void main(String[] args) {
		try {
			CamelContext context = new DefaultCamelContext();
			KafkaComponent kafka = new KafkaComponent();
			kafka.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // kafka1, kafka2,Register 3 servers of kafka3 as brokers
			context.addComponent("kafka", kafka);

			context.addRoutes(new RouteBuilder() {
				public void configure() {
					from("timer:trigger?period=1000") //Run every 1000 milliseconds
							.routeId("kafka_producer_route")
							.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Set the current date and time for the BODY of the message
							.to("kafka:test01"); //Publish message to topic test01

					from("kafka:test01") //Subscribe message from topic test01
							.routeId("kafka_consumer_route")
							.log("body = ${body}"); //Display the BODY content of the message in the log
				}
			});

			context.start();
			Thread.sleep(10000);
			context.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

Now let's create a kafka application.

Add a library to use kafka

For Maven, add the following library to pom.xml. camel-kafka is a component for handling Kafka in Camel, and $ {camel.version} specifies the version of camel you are using. kafka-clients is a client library for Kafka.

pom.xml


		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-kafka</artifactId>
			<version>${camel.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.0.0</version>
		</dependency>

Component URI

The URI for using the camel-kafka component looks like this:

kafka:topic[?options]

Specify the target topic name in the context path (topic).

Create a route to publish / subscribe messages

Create a route to publish the message to kafka. The route written in Java DSL is as follows.

The topic is created automatically, but if you want to change the topic settings, create it manually. The code is too simple to explain.

				from("timer:trigger?period=1000") //Run every 1000 milliseconds
						.routeId("kafka_producer_route")
						.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Set the current date and time for the BODY of the message
						.to("kafka:test01"); //Publish message to topic test01

Next, create a route that subscribes the message to kafka. The route written in Java DSL is as follows.

				from("kafka:test01") //Subscribe message from topic test01
						.routeId("kafka_consumer_route")
						.log("body = ${body}"); //Display the BODY content of the message in the log

Since it is not possible to connect to kafka with this alone, write the settings for connecting. First of all, create an instance of Kafka Configuration to register the kafka settings, and configure the settings to connect to kafka. There are various setting items, and I would like to briefly explain them later. In the example below, some settings are made, but if you just want to connect, just set the broker to connect to with the setBrokers method. Other than that, the default value is used and you do not need to set it.

		KafkaConfiguration kafkaConfig = new KafkaConfiguration();
		kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Connect to 3 kafka brokers
		kafkaConfig.setGroupId("group1"); //Set consumer group ID
		kafkaConfig.setAutoCommitEnable(true); //Turn on autocommit
		kafkaConfig.setAutoCommitIntervalMs(5000); //Set autocommit interval
		kafkaConfig.setAutoOffsetReset("earliest"); //Behavior when subscribing to a partition with no committed offset
		kafkaConfig.setRequestRequiredAcks("all"); //Conditions for judging that publish was successful
		kafkaConfig.setConsumersCount(1); //Number of consumers

Next, create an instance of KafkaComponent and set the instance of KafkaConfiguration created earlier.

		KafkaComponent kafka = new KafkaComponent();
		kafka.setConfiguration(kafkaConfig);

This is the only setting for connecting to kafka.

This completes the publisher and consumer route creation and kafka connection settings. The whole source that created the main function etc. that uses these is as follows.

package example.camelbegginer.kafka;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.impl.DefaultCamelContext;

public class KafkaExample {

	public static void main(String[] args) {
		try {
			CamelContext context = new DefaultCamelContext();
			context.addComponent("kafka", createKafkaComponent());
			context.addRoutes(createProducerRouteBuilder());
			context.addRoutes(createConsumerRouteBuilder());

			context.start();
			Thread.sleep(10000);
			context.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	static KafkaComponent createKafkaComponent() {
		KafkaConfiguration kafkaConfig = new KafkaConfiguration();
		kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Connect to 3 kafka brokers
		kafkaConfig.setGroupId("group1"); //Set group ID
		kafkaConfig.setAutoCommitEnable(true); //Turn on autocommit
		kafkaConfig.setAutoCommitIntervalMs(5000); //Set autocommit interval
		kafkaConfig.setAutoOffsetReset("earliest"); //Behavior when reading a partition with no committed offset
		kafkaConfig.setRequestRequiredAcks("all"); //Conditions for judging that publish was successful
		kafkaConfig.setConsumersCount(1); //Number of consumers

		KafkaComponent kafka = new KafkaComponent();
		kafka.setConfiguration(kafkaConfig);

		return kafka;
	}

	static RouteBuilder createProducerRouteBuilder() {
		return new RouteBuilder() {
			public void configure() throws Exception {
				from("timer:trigger?period=1000") //Run every 1000 milliseconds
						.routeId("kafka_producer_route")
						.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Set the current date and time for the BODY of the message
						.to("kafka:test01"); //Publish message to topic test01
			}
		};
	}

	static RouteBuilder createConsumerRouteBuilder() {
		return new RouteBuilder() {
			public void configure() throws Exception {
				from("kafka:test01") //Subscribe message from topic test01
						.routeId("kafka_consumer_route")
						.log("body = ${body}"); //Display the BODY content of the message in the log
			}
		};
	}
}

Run the Camel application you created

When this application is executed, the log is output every second as shown below.

[2019-01-31 21:15:38.112], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:38
[2019-01-31 21:15:39.031], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:39
[2019-01-31 21:15:40.034], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:40
[2019-01-31 21:15:41.026], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:41
[2019-01-31 21:15:42.029], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:42
[2019-01-31 21:15:43.024], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:43
[2019-01-31 21:15:44.044], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:44
[2019-01-31 21:15:45.028], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:45
[2019-01-31 21:15:46.032], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:46

Also, if you output the header with the following code, the message offset and partition information will be output to the log.

				from("kafka:test01") //Subscribe message from topic test01
						.routeId("kafka_consumer_route")
						.log("body = ${body}") //Display the BODY content of the message in the log
						.log("${headers}"); //★ Add here

The output log is as follows.

2019-02-01 10:10:46.236], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, {breadcrumbId=[B@58a2fc46, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = breadcrumbId, value = [73, 68, 45, 109, 107, 121, 45, 80, 67, 45, 49, 53, 52, 56, 57, 56, 51, 52, 52, 51, 56, 55, 51, 45, 48, 45, 49])], isReadOnly = false), kafka.OFFSET=52, kafka.PARTITION=0, kafka.TIMESTAMP=1548983446195, kafka.TOPIC=test01}

In this way, you could use the Kafka component of the Apache Camel framework to create an application that sends and receives simple messages. Did you understand that using Camel makes it easy to connect with kafka?

Main properties of camel-kafka component

Finally, I will explain the main properties of the camel-kafka component. There are many properties other than the table below, please refer to the official page for details.

-Kafka Component (former official website)

There are many properties, but some are the same as those used by the kafka client, so you should be able to use them without any discomfort.

Property name producer/consumer Description Default value Mold
brokers Common Specify the Kafka broker. The format is host1:When specifying multiple brokers on port1, host1:port1,host2:Separated from port2 and commas. String
clientId Common The client ID is a user-specified string for tracing calls from the client's application. String
autoCommitEnable consumer If set to true, the consumer will automatically commit the offset of the retrieved message on a regular basis. The autocommit interval is specified with the autoCommitIntervalMs option. TRUE Boolean
autoCommitIntervalMs consumer Specifies the interval (milliseconds) at which the consumer automatically commits the message offset. 5000 Integer
autoCommitOnStop consumer Specifies whether to explicitly autocommit the last subscribed message from the broker when the consumer goes down. This requires the autoCommitEnable option to be turned on. Possible values are sync, async, or none. sync String
autoOffsetReset consumer If there is no initial offset, or if the offset is out of range, do the following: earliest: Resets the offset to the very first offset. latest: Automatically resets the offset to the latest (last) offset. fail: Throws an exception to the consumer. latest String
consumerRequestTimeoutMs consumer The configuration controls the maximum amount of time a client waits for a request to respond. If no response is received before the timeout expires, the client resends the request as needed or fails the request when the retries are exhausted. 40000 Integer
consumersCount consumer Specifies the number of stemmers that connect to the kafka server. 1 int
consumerStreams consumer Number of simultaneous consumers. 10 int
groupId consumer Specifies a string that identifies the group to which this consumer belongs. Setting the same group ID indicates that multiple consumers are all in the same consumer group. This is a required option for Consumers. String
maxPollRecords consumer poll()Specifies the maximum number of records returned in a single call to. 500 Integer
pollTimeoutMs consumer The timeout value (in milliseconds) used when polling the KafkaConsumer. 5000 Long
compressionCodec producer This parameter allows you to specify the compression codec for the data generated by the producer. Possible values are none, gzip, and snappy. It is not compressed by default (none). none
requestRequiredAcks producer Set the condition to consider that the request from the producer is completed. You will specify the level of reliability of the message. There are the following three values to specify. acks=If it is 0, the message will be lost if the leader broker goes down, so normally acks=Specify 1 or all. ・ Acks=0producer considers the transmission to be complete without waiting for an acknowledgment from the server. In this case, we cannot guarantee that kafka received the message. The offset returned for each record is always-Set to 1. ・ Acks=1 The reader writes the message locally, but responds without waiting for completion from other followers. In this case, if the reader goes down immediately after approving the message, other followers will not be able to replicate it and the message will be lost. ・ Acks=all Waits for all other synchronous replica writes to complete before the reader returns a response. This ensures that messages are not lost as long as at least one synchronous replica is alive. 1 String
requestTimeoutMs producer The broker requests before returning an error to the client (producer).required.Time to wait before meeting acks requirements. 305000 Integer
retries producer If set to a value greater than 0, the client resends (retries) records that failed to be sent due to a temporary error. 0 Integer

reference

Send and receive simple messages using the Kafka component of the Apache Camel framework (old article) -Kafka Component (official site) -Kafka Component (former official website)

Recommended Posts

Send and receive simple messages using the Kafka component of the Apache Camel framework (Java DSL)
Transaction management of the integrated framework "Apache Camel"
Parse and objectize JSON using the @JsonProperty annotation of the Java library Jackson
Display Japanese calendar and days of the week using java8 standard class
Be careful with requests and responses when using the Serverless Framework in Java
[Java] The confusing part of String and StringBuilder
Simple installation of nginx and Docker using ansible
I compared the characteristics of Java and .NET