When implementing Kafka consumer, I often use languages such as Java and Scala, but I would like to use Python, which can be implemented lightly if it is not so complicated. This time I would like to write a maintenance process to move the offset of one consumer to another consumer using kafka-python of Apache Kafka library for Python.
This time we will use KafkaAdminClient. You can get it from KafkaConsumer, but it is complicated to subscribe to and poll the consumer, so it is recommended to use this client.
from kafka import KafkaAdminClient
#Assuming consumer01 is subscribing to topic01
target_consumer_name = "consumer01"
#Get KafkaAdminClient
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
#Get offset
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)
For reference, something like this will be returned. Offset information with the TopicPartition class as the key.
{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}
Use Kafka Consumer to rewrite the consumer. Write the offset information obtained earlier to consumer02. If consumer does not exist, it will be created automatically.
from kafka import KafkaConsumer
consumer_group_name = 'consumer02'
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers,
enable_auto_commit=False)
#Write offset information (created if consumer does not exist)
consumer.commit(offsets)
The offset information of the same topic as consumer01 is now written to consumer02. When you start a subscription using consumer02, you should start the subscription from the same offset as consumer01 when you got the offset.
consumer_group_name='consumer02'
#Get consumer
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers)
#Change to subscribe topic01
consumer.subscribe(topics=['topic01'])
#Actually participated in topic01 consumer02
consumer.poll()
#Continue to print received messages
for msg in consumer:
print(msg)
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html
Recommended Posts