Kafka 消费者在关闭后丢失消息状态
Kafka consumer losing state of messages after shutdown
感谢您抽出时间回答问题。我正在将 kafka 与 python 消费者一起使用。当消费者启动时,一切都很好,运行 消息被推送到 kafka,然后由消费者读取。
但是,如果消费者由于某种原因宕机,当它恢复时,它只会读取消费者恢复后发布到 kafka 的新消息。 shutdown-poweron之间的消息丢失,即consumer回来后没有读取这些消息。
consumer = KafkaConsumer(..)
是我用来创建消费者的。
你用的是什么客户端?
也许有必要为消费者设置起始偏移量。查看 seek() 函数和 auto-commit 设置。
可能我的代码有帮助,但也许我们使用不同的消费者 类(我的:http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):
def connect(self):
'''Initialize Kafka Client and Consumer.'''
try:
print "Try to init KafkaClient:", self.Brokers
self.__kafka_client = KafkaClient( self.Brokers )
print "Try to init Kafka Consumer."
self.__consumer = SimpleConsumer(
self.__kafka_client,
self.GroupID,
self.Topic,
auto_commit = True,
partitions=self.Partitions,
auto_commit_every_n = 100,
auto_commit_every_t=5000,
fetch_size_bytes=4096,
buffer_size=4096,
max_buffer_size=32768,
iter_timeout=None,
auto_offset_reset='largest' )
print "Set the starting offset."
self.__consumer.seek(0, self.OffsetMode)
self.__consumer.seek(0, 0) =>start reading from the beginning of the queue.
self.__consumer.seek(0, 1) =>start reading from current offset.
self.__consumer.seek(0, 2) =>skip all the pending messages and start reading only new messages (** maybeyour case**).
感谢您抽出时间回答问题。我正在将 kafka 与 python 消费者一起使用。当消费者启动时,一切都很好,运行 消息被推送到 kafka,然后由消费者读取。
但是,如果消费者由于某种原因宕机,当它恢复时,它只会读取消费者恢复后发布到 kafka 的新消息。 shutdown-poweron之间的消息丢失,即consumer回来后没有读取这些消息。
consumer = KafkaConsumer(..)
是我用来创建消费者的。
你用的是什么客户端? 也许有必要为消费者设置起始偏移量。查看 seek() 函数和 auto-commit 设置。 可能我的代码有帮助,但也许我们使用不同的消费者 类(我的:http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):
def connect(self):
'''Initialize Kafka Client and Consumer.'''
try:
print "Try to init KafkaClient:", self.Brokers
self.__kafka_client = KafkaClient( self.Brokers )
print "Try to init Kafka Consumer."
self.__consumer = SimpleConsumer(
self.__kafka_client,
self.GroupID,
self.Topic,
auto_commit = True,
partitions=self.Partitions,
auto_commit_every_n = 100,
auto_commit_every_t=5000,
fetch_size_bytes=4096,
buffer_size=4096,
max_buffer_size=32768,
iter_timeout=None,
auto_offset_reset='largest' )
print "Set the starting offset."
self.__consumer.seek(0, self.OffsetMode)
self.__consumer.seek(0, 0) =>start reading from the beginning of the queue.
self.__consumer.seek(0, 1) =>start reading from current offset.
self.__consumer.seek(0, 2) =>skip all the pending messages and start reading only new messages (** maybeyour case**).