Apache kafka is an open source distributed messaging system that allows you to publish / subscribe large numbers of messages at high speed. Here, we will create an application that exchanges messages with kafka using Apache Camel, which is an integrated framework. Use the camel-kafka component to connect to kafka with Apache Camel.
In this environment, kafka uses v2.0.0. Please refer to the following article for how to build the environment.
-Apacke kafka v2.0.0 has been released, so try installing it
Also, I wrote a similar application in XML DSL before in the next article, but this time I will create an application using Java DSL. Also, I would like to write in more detail than last time.
-Send and receive simple messages using the Kafka component of the Apache Camel framework
The application created this time publishes the date and time message to kafka every second as shown in the figure below, and subscribes the same message from kafka.
With Apache Camel, you can implement such an application using kafka with just a few dozen lines of code below. It's not pretty code because I put all the code in the main function, but I hope you can see that it can be easily implemented by Camel.
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
KafkaComponent kafka = new KafkaComponent();
kafka.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // kafka1, kafka2,Register 3 servers of kafka3 as brokers
context.addComponent("kafka", kafka);
context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:trigger?period=1000") //Run every 1000 milliseconds
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Set the current date and time for the BODY of the message
.to("kafka:test01"); //Publish message to topic test01
from("kafka:test01") //Subscribe message from topic test01
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Display the BODY content of the message in the log
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
Now let's create a kafka application.
For Maven, add the following library to pom.xml. camel-kafka is a component for handling Kafka in Camel, and $ {camel.version} specifies the version of camel you are using. kafka-clients is a client library for Kafka.
pom.xml
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
The URI for using the camel-kafka component looks like this:
kafka:topic[?options]
Specify the target topic name in the context path (topic).
Create a route to publish the message to kafka. The route written in Java DSL is as follows.
The topic is created automatically, but if you want to change the topic settings, create it manually. The code is too simple to explain.
from("timer:trigger?period=1000") //Run every 1000 milliseconds
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Set the current date and time for the BODY of the message
.to("kafka:test01"); //Publish message to topic test01
Next, create a route that subscribes the message to kafka. The route written in Java DSL is as follows.
from("kafka:test01") //Subscribe message from topic test01
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Display the BODY content of the message in the log
Since it is not possible to connect to kafka with this alone, write the settings for connecting. First of all, create an instance of Kafka Configuration to register the kafka settings, and configure the settings to connect to kafka. There are various setting items, and I would like to briefly explain them later. In the example below, some settings are made, but if you just want to connect, just set the broker to connect to with the setBrokers method. Other than that, the default value is used and you do not need to set it.
KafkaConfiguration kafkaConfig = new KafkaConfiguration();
kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Connect to 3 kafka brokers
kafkaConfig.setGroupId("group1"); //Set consumer group ID
kafkaConfig.setAutoCommitEnable(true); //Turn on autocommit
kafkaConfig.setAutoCommitIntervalMs(5000); //Set autocommit interval
kafkaConfig.setAutoOffsetReset("earliest"); //Behavior when subscribing to a partition with no committed offset
kafkaConfig.setRequestRequiredAcks("all"); //Conditions for judging that publish was successful
kafkaConfig.setConsumersCount(1); //Number of consumers
Next, create an instance of KafkaComponent and set the instance of KafkaConfiguration created earlier.
KafkaComponent kafka = new KafkaComponent();
kafka.setConfiguration(kafkaConfig);
This is the only setting for connecting to kafka.
This completes the publisher and consumer route creation and kafka connection settings. The whole source that created the main function etc. that uses these is as follows.
package example.camelbegginer.kafka;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.impl.DefaultCamelContext;
public class KafkaExample {
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
context.addComponent("kafka", createKafkaComponent());
context.addRoutes(createProducerRouteBuilder());
context.addRoutes(createConsumerRouteBuilder());
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
static KafkaComponent createKafkaComponent() {
KafkaConfiguration kafkaConfig = new KafkaConfiguration();
kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Connect to 3 kafka brokers
kafkaConfig.setGroupId("group1"); //Set group ID
kafkaConfig.setAutoCommitEnable(true); //Turn on autocommit
kafkaConfig.setAutoCommitIntervalMs(5000); //Set autocommit interval
kafkaConfig.setAutoOffsetReset("earliest"); //Behavior when reading a partition with no committed offset
kafkaConfig.setRequestRequiredAcks("all"); //Conditions for judging that publish was successful
kafkaConfig.setConsumersCount(1); //Number of consumers
KafkaComponent kafka = new KafkaComponent();
kafka.setConfiguration(kafkaConfig);
return kafka;
}
static RouteBuilder createProducerRouteBuilder() {
return new RouteBuilder() {
public void configure() throws Exception {
from("timer:trigger?period=1000") //Run every 1000 milliseconds
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Set the current date and time for the BODY of the message
.to("kafka:test01"); //Publish message to topic test01
}
};
}
static RouteBuilder createConsumerRouteBuilder() {
return new RouteBuilder() {
public void configure() throws Exception {
from("kafka:test01") //Subscribe message from topic test01
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Display the BODY content of the message in the log
}
};
}
}
When this application is executed, the log is output every second as shown below.
[2019-01-31 21:15:38.112], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:38
[2019-01-31 21:15:39.031], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:39
[2019-01-31 21:15:40.034], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:40
[2019-01-31 21:15:41.026], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:41
[2019-01-31 21:15:42.029], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:42
[2019-01-31 21:15:43.024], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:43
[2019-01-31 21:15:44.044], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:44
[2019-01-31 21:15:45.028], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:45
[2019-01-31 21:15:46.032], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:46
Also, if you output the header with the following code, the message offset and partition information will be output to the log.
from("kafka:test01") //Subscribe message from topic test01
.routeId("kafka_consumer_route")
.log("body = ${body}") //Display the BODY content of the message in the log
.log("${headers}"); //★ Add here
The output log is as follows.
2019-02-01 10:10:46.236], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, {breadcrumbId=[B@58a2fc46, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = breadcrumbId, value = [73, 68, 45, 109, 107, 121, 45, 80, 67, 45, 49, 53, 52, 56, 57, 56, 51, 52, 52, 51, 56, 55, 51, 45, 48, 45, 49])], isReadOnly = false), kafka.OFFSET=52, kafka.PARTITION=0, kafka.TIMESTAMP=1548983446195, kafka.TOPIC=test01}
In this way, you could use the Kafka component of the Apache Camel framework to create an application that sends and receives simple messages. Did you understand that using Camel makes it easy to connect with kafka?
Finally, I will explain the main properties of the camel-kafka component. There are many properties other than the table below, please refer to the official page for details.
-Kafka Component (former official website)
There are many properties, but some are the same as those used by the kafka client, so you should be able to use them without any discomfort.
Property name | producer/consumer | Description | Default value | Mold |
---|---|---|---|---|
brokers | Common | Specify the Kafka broker. The format is host1:When specifying multiple brokers on port1, host1:port1,host2:Separated from port2 and commas. | String | |
clientId | Common | The client ID is a user-specified string for tracing calls from the client's application. | String | |
autoCommitEnable | consumer | If set to true, the consumer will automatically commit the offset of the retrieved message on a regular basis. The autocommit interval is specified with the autoCommitIntervalMs option. | TRUE | Boolean |
autoCommitIntervalMs | consumer | Specifies the interval (milliseconds) at which the consumer automatically commits the message offset. | 5000 | Integer |
autoCommitOnStop | consumer | Specifies whether to explicitly autocommit the last subscribed message from the broker when the consumer goes down. This requires the autoCommitEnable option to be turned on. Possible values are sync, async, or none. | sync | String |
autoOffsetReset | consumer | If there is no initial offset, or if the offset is out of range, do the following: earliest: Resets the offset to the very first offset. latest: Automatically resets the offset to the latest (last) offset. fail: Throws an exception to the consumer. | latest | String |
consumerRequestTimeoutMs | consumer | The configuration controls the maximum amount of time a client waits for a request to respond. If no response is received before the timeout expires, the client resends the request as needed or fails the request when the retries are exhausted. | 40000 | Integer |
consumersCount | consumer | Specifies the number of stemmers that connect to the kafka server. | 1 | int |
consumerStreams | consumer | Number of simultaneous consumers. | 10 | int |
groupId | consumer | Specifies a string that identifies the group to which this consumer belongs. Setting the same group ID indicates that multiple consumers are all in the same consumer group. This is a required option for Consumers. | String | |
maxPollRecords | consumer | poll()Specifies the maximum number of records returned in a single call to. | 500 | Integer |
pollTimeoutMs | consumer | The timeout value (in milliseconds) used when polling the KafkaConsumer. | 5000 | Long |
compressionCodec | producer | This parameter allows you to specify the compression codec for the data generated by the producer. Possible values are none, gzip, and snappy. It is not compressed by default (none). | none | |
requestRequiredAcks | producer | Set the condition to consider that the request from the producer is completed. You will specify the level of reliability of the message. There are the following three values to specify. acks=If it is 0, the message will be lost if the leader broker goes down, so normally acks=Specify 1 or all. ・ Acks=0producer considers the transmission to be complete without waiting for an acknowledgment from the server. In this case, we cannot guarantee that kafka received the message. The offset returned for each record is always-Set to 1. ・ Acks=1 The reader writes the message locally, but responds without waiting for completion from other followers. In this case, if the reader goes down immediately after approving the message, other followers will not be able to replicate it and the message will be lost. ・ Acks=all Waits for all other synchronous replica writes to complete before the reader returns a response. This ensures that messages are not lost as long as at least one synchronous replica is alive. | 1 | String |
requestTimeoutMs | producer | The broker requests before returning an error to the client (producer).required.Time to wait before meeting acks requirements. | 305000 | Integer |
retries | producer | If set to a value greater than 0, the client resends (retries) records that failed to be sent due to a temporary error. | 0 | Integer |
Send and receive simple messages using the Kafka component of the Apache Camel framework (old article) -Kafka Component (official site) -Kafka Component (former official website)
Recommended Posts