Python kafka:有没有办法在发布新消息之前阻止消费者使用 kafka 主题?
Python kafka: Is there a way to block a consumer on a kafka topic till a new message is posted?
我有一个消费者订阅了一个生产者线程定期发布的测试主题。我希望能够阻塞消费者线程,直到出现新消息 - 然后处理它并再次开始等待。我最接近的是:
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
while True:
print(consumer.poll(timeout_ms=5000))
有没有更惯用的方法(或者这种方法有什么我看不到的严重问题)?
kafka 的新手,因此非常欢迎对此设计提出一般性建议。完整 (运行) 示例:
import time
from threading import Thread
import kafka
from kafka import KafkaProducer, KafkaConsumer
print('python-kafka:', kafka.__version__)
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(str(key), encoding='utf-8')
value_bytes = bytes(str(value), encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
except Exception as ex:
print('Exception in publishing message\n', ex)
localhost_ = 'localhost:9092'
def kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=[localhost_],
api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
j = 0
while True:
publish_message(_producer, topic_name, value=j, key=j)
j += 1
time.sleep(5)
if __name__ == '__main__':
print('Running Producer..')
topic_name = 'test'
prod_thread = Thread(target=kafka_producer)
prod_thread.start()
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
# consumer.subscribe([topic_name])
while True:
print(consumer.poll(timeout_ms=5000))
python-kafka: 1.3.5
在无限循环中轮询是Kafka: The Definitive Guide as well. Here is a Java excerpt from Chapter 4. Kafka Consumers: Reading Data from Kafka中使用相同想法的建议:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
...
}
}
这很好地解释了如何在 Python 中推荐使用这些库。
kafka-python(请参阅 A Tale of Two Kafka Clients 中的完整示例)
from kafka import KafkaConsumer
...
kafka_consumer = Consumer(
...
)
consumer.subscribe([topic])
running = True
while running:
message = kafka_consumer.poll()
...
confluent-kafka-python(请参阅 Introduction to Apache Kafka for Python Programmers 中的完整示例)
from confluent_kafka import Consumer, KafkaError
...
c = Consumer(settings)
c.subscribe(['mytopic'])
try:
while True:
msg = c.poll(0.1)
...
可能会出现的另一个密切相关的问题是您如何处理消息。
您的这部分代码可能依赖于外部依赖项(数据库、远程服务、网络文件系统等),这可能会导致处理尝试失败。
因此,实现重试逻辑可能是个好主意,您可以在博客中找到关于重试逻辑的详细描述 post Retrying consumer architecture in the Apache Kafka.
我有一个消费者订阅了一个生产者线程定期发布的测试主题。我希望能够阻塞消费者线程,直到出现新消息 - 然后处理它并再次开始等待。我最接近的是:
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
while True:
print(consumer.poll(timeout_ms=5000))
有没有更惯用的方法(或者这种方法有什么我看不到的严重问题)?
kafka 的新手,因此非常欢迎对此设计提出一般性建议。完整 (运行) 示例:
import time
from threading import Thread
import kafka
from kafka import KafkaProducer, KafkaConsumer
print('python-kafka:', kafka.__version__)
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(str(key), encoding='utf-8')
value_bytes = bytes(str(value), encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
except Exception as ex:
print('Exception in publishing message\n', ex)
localhost_ = 'localhost:9092'
def kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=[localhost_],
api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
j = 0
while True:
publish_message(_producer, topic_name, value=j, key=j)
j += 1
time.sleep(5)
if __name__ == '__main__':
print('Running Producer..')
topic_name = 'test'
prod_thread = Thread(target=kafka_producer)
prod_thread.start()
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
# consumer.subscribe([topic_name])
while True:
print(consumer.poll(timeout_ms=5000))
python-kafka: 1.3.5
在无限循环中轮询是Kafka: The Definitive Guide as well. Here is a Java excerpt from Chapter 4. Kafka Consumers: Reading Data from Kafka中使用相同想法的建议:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
...
}
}
这很好地解释了如何在 Python 中推荐使用这些库。
kafka-python(请参阅 A Tale of Two Kafka Clients 中的完整示例)
from kafka import KafkaConsumer
...
kafka_consumer = Consumer(
...
)
consumer.subscribe([topic])
running = True
while running:
message = kafka_consumer.poll()
...
confluent-kafka-python(请参阅 Introduction to Apache Kafka for Python Programmers 中的完整示例)
from confluent_kafka import Consumer, KafkaError
...
c = Consumer(settings)
c.subscribe(['mytopic'])
try:
while True:
msg = c.poll(0.1)
...
可能会出现的另一个密切相关的问题是您如何处理消息。
您的这部分代码可能依赖于外部依赖项(数据库、远程服务、网络文件系统等),这可能会导致处理尝试失败。
因此,实现重试逻辑可能是个好主意,您可以在博客中找到关于重试逻辑的详细描述 post Retrying consumer architecture in the Apache Kafka.