First, create an implementation of `` `BatchMessageListener```.
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.stereotype.Component;
@Component
public class SampleBatchMessageListener implements BatchMessageListener<String, String> {
@KafkaListener(topics = "mytopic2")
@Override
public void onMessage(List<ConsumerRecord<String, String>> data) {
System.out.println(data.size() + ":" + data);
}
}
Then change spring.kafka.listener.type
to
BATCH```.
src/main/resources/application.properties
spring.kafka.bootstrap-servers=localhost:32770
spring.kafka.consumer.group-id=sample-group
spring.kafka.listener.type=BATCH
Now spring-boot is started and published appropriately.
5:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0,offs (omitted below)
1:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0,offset (Omitted below)
3:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0,o (omitted below)
You can see that the number of cases is different at one time.
The above works, but in actual operation, it seems to be useless unless the following parameters are also specified. Seeing the messages arriving in different numbers doesn't mean they can be used exactly like the so-called RDBMS commit intervals.
spring.kafka.consumer.fetch-max-wait
spring.kafka.consumer.fetch-min-size
spring.kafka.consumer.max-poll-records
The idea of batch processing is also effective in kafka, but it seems that the circumstances peculiar to kafka need to be considered.
Document https://docs.spring.io/spring-kafka/docs/current/reference/html/#message-listeners