如何在 Python 中使用 kafka 客户端描述主题

How to describe a topic using kafka client in Python

我是 python 的 kafka 客户端的初学者,我需要一些帮助来描述使用客户端的主题。

我能够使用以下代码列出我所有的 kafka 主题:-

consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['kafka1'])
topicList = consumer.topics()

有趣的是,Java 此功能 (describeTopics()) 位于 KafkaAdminCLient.java.

所以,我试图寻找相同的 python,然后我发现了 code repository of kafka-python

admin-client 中的文档(in-line 注释)相当于 kafka-python 包中的说明如下:

describe topics functionality is in ClusterMetadata
Note: if implemented here, send the request to the controller

然后我切换到同一个存储库中的 cluster.py file。这包含您用来检索主题列表的 topics() 函数和以下两个可以帮助您实现 describe 功能的函数:

  1. partitions_for_topic() - Return 主题的所有分区集(无论是否可用)
  2. available_partitions_for_topic() - Return 具有已知领导者的分区集

注意:我自己还没有尝试过,所以我不确定行为是否与您在 [=16= 的结果中看到的相同] 命令但值得一试。

希望对您有所帮助!

在参考了多篇文章和代码示例之后,我能够通过 describe_configs 使用 confluent_kafka 来做到这一点。

Link 1 [Confluent-kafka-python] Link 2 Git Sample

下面是我的示例代码!!

from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource
import confluent_kafka
import concurrent.futures

#Creation of config
conf = {'bootstrap.servers': 'kafka1','session.timeout.ms': 6000}
adminClient = AdminClient(conf)
topic_configResource = adminClient.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "myTopic")])
    for j in concurrent.futures.as_completed(iter(topic_configResource.values())):
        config_response = j.result(timeout=1)

我已经找到了如何使用 kafka-python:

from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
KAFKA_URL = "localhost:9092" # kafka broker
KAFKA_TOPIC = "test" # topic name

admin_client = KafkaAdminClient(bootstrap_servers=[KAFKA_URL])
configs = admin_client.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, KAFKA_TOPIC)])
config_list = configs.resources[0][4]

config_list(元组列表)中,您拥有该主题的所有配置。

参考:https://docs.confluent.io/current/clients/confluent-kafka-python/

  1. list_topics 提供 confluent_kafka.admin.TopicMetadata(主题, 分区)
  2. kafka.admin.TopicMetadata.partitions 提供:confluent_kafka.admin.PartitionMetadata (Partition id, leader, replicas, isrs)

    from confluent_kafka.admin import AdminClient
    kafka_admin = AdminClient({"bootstrap.servers": bootstrap_servers})    
    for topic in topics:    
        x = kafka_admin.list_topics(topic=topic)    
        print x.topics, '\n'    
        for key, value in x.topics.items():    
            for keyy, valuey in value.partitions.items():    
                print keyy, ' Partition id : ', valuey, 'leader : ', valuey.leader,' replica: ', valuey.replicas