Lors de l'implémentation du consommateur Kafka, j'utilise souvent des langages tels que Java et Scala, mais j'aimerais utiliser Python, qui peut être implémenté légèrement si ce n'est pas si compliqué. Cette fois, j'aimerais écrire un processus de maintenance qui déplace le décalage d'un consommateur vers un autre consommateur en utilisant kafka-python de la bibliothèque Apache Kafka pour Python.
Cette fois, nous utiliserons KafkaAdminClient. Vous pouvez l'obtenir auprès de KafkaConsumer, mais il est compliqué de s'abonner et d'interroger le consommateur, il est donc recommandé d'utiliser ce client.
from kafka import KafkaAdminClient
#En supposant que consumer01 s'abonne à topic01
target_consumer_name = "consumer01"
#Obtenez KafkaAdminClient
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
#Obtenir un décalage
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)
Pour référence, quelque chose comme celui-ci sera retourné. Décalez les informations avec la classe TopicPartition comme clé.
{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}
Utilisez Kafka Consumer pour réécrire le consommateur. Écrivez les informations de décalage obtenues précédemment dans consumer02. Si le consommateur n'existe pas, il sera créé automatiquement.
from kafka import KafkaConsumer
consumer_group_name = 'consumer02'
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers,
enable_auto_commit=False)
#Écrire les informations de décalage (créées si le consommateur n'existe pas)
consumer.commit(offsets)
Les informations de décalage de la même rubrique que consumer01 sont désormais écrites dans consumer02. Si vous démarrez votre abonnement à l'aide de consumer02, vous devez démarrer votre abonnement à partir du même décalage que consumer01 lorsque vous avez obtenu le décalage.
consumer_group_name='consumer02'
#Obtenez le consommateur
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers)
#Changer pour s'abonner topic01
consumer.subscribe(topics=['topic01'])
#Participé au topic01 consumer02
consumer.poll()
#Continuer à imprimer les messages reçus
for msg in consumer:
print(msg)
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html
Recommended Posts