在 python 中保持 Kafka 消费者存活的最佳实践是什么?
What is the best practice for keeping Kafka consumer alive in python?
在让消费者保持活力方面,有些事情让我感到困惑。假设我有一个不断写入数据的主题。但是,一天中的一个小时内,没有新消息。如果我为我的消费者设置超时,当没有新消息时,消费者将关闭。
现在,新消息到达。但是,没有活着的消费者来消费它们。
我应该如何处理这种情况?我的消费者可能会消费所有消息并关闭。让他们活着的最好方法是什么?有没有办法在新消息到达时自动调用它们?此类场景的最佳做法是什么?
为什么不只是
import time
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-1',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {msg.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")
我无法评论“为消费者设置超时”的要求,但在大多数情况下,消费者应该运行“永远”并且应该也可以以高度可用的方式添加到消费者组。
使用生成器函数
def consumableMessages(self):
self.kafka.subscribe(self.topic)
try:
for message in self.kafka:
yield message.value.decode("utf-8")
except KeyboardInterrupt:
self.kafka.close()
然后我们就可以等消息了:
for message in kafka.consumableMessages():
print(message)
在让消费者保持活力方面,有些事情让我感到困惑。假设我有一个不断写入数据的主题。但是,一天中的一个小时内,没有新消息。如果我为我的消费者设置超时,当没有新消息时,消费者将关闭。
现在,新消息到达。但是,没有活着的消费者来消费它们。
我应该如何处理这种情况?我的消费者可能会消费所有消息并关闭。让他们活着的最好方法是什么?有没有办法在新消息到达时自动调用它们?此类场景的最佳做法是什么?
为什么不只是
import time
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-1',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {msg.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")
我无法评论“为消费者设置超时”的要求,但在大多数情况下,消费者应该运行“永远”并且应该也可以以高度可用的方式添加到消费者组。
使用生成器函数
def consumableMessages(self):
self.kafka.subscribe(self.topic)
try:
for message in self.kafka:
yield message.value.decode("utf-8")
except KeyboardInterrupt:
self.kafka.close()
然后我们就可以等消息了:
for message in kafka.consumableMessages():
print(message)