如何使用 kafka-python 以编程方式创建主题?
How to programmatically create topics using kafka-python?
我刚开始使用 Kafka,对 Python 还很陌生。我正在使用这个名为 kafka-python
的库与我的 Kafka 代理进行通信。现在我需要从我的代码动态创建一个主题,从文档中我看到我可以调用 create_topics()
方法来这样做,但是我不确定,我将如何获得这个 [=19= 的实例].我无法从文档中理解这一点。
有人可以帮我解决这个问题吗?
您首先需要创建一个 KafkaAdminClient
的实例。以下应该可以为您解决问题:
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
topic_list = [NewTopic(name="example_topic", num_partitions=1, replication_factor=1)]
admin_client.create_topics(new_topics=topic_list, validate_only=False)
或者,您可以使用 confluent_kafka
client which is a lightweight wrapper around librdkafka:
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({"bootstrap_servers": "localhost:9092"})
topic_list = [NewTopic("example_topic", 1, 1)]
admin_client.create_topics(topic_list)
我刚开始使用 Kafka,对 Python 还很陌生。我正在使用这个名为 kafka-python
的库与我的 Kafka 代理进行通信。现在我需要从我的代码动态创建一个主题,从文档中我看到我可以调用 create_topics()
方法来这样做,但是我不确定,我将如何获得这个 [=19= 的实例].我无法从文档中理解这一点。
有人可以帮我解决这个问题吗?
您首先需要创建一个 KafkaAdminClient
的实例。以下应该可以为您解决问题:
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
topic_list = [NewTopic(name="example_topic", num_partitions=1, replication_factor=1)]
admin_client.create_topics(new_topics=topic_list, validate_only=False)
或者,您可以使用 confluent_kafka
client which is a lightweight wrapper around librdkafka:
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({"bootstrap_servers": "localhost:9092"})
topic_list = [NewTopic("example_topic", 1, 1)]
admin_client.create_topics(topic_list)