Python Kafka 消费者从一开始就没有收到消息?

Python Kafka consumer doesn't receive the message from beginning?

我在 Windows PC 上安装了 Kafka。创建了一个主题 quickstart-events 并发送了一些消息。 运行 参数为 --from-beginning 的控制台消费者可以接收消息。

.\bin\windows\kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Picked up JAVA_TOOL_OPTIONS: -agentpath:"C:\WINDOWS\system32\Aternity\Java\JavaHookLoader.dll"="C:\ProgramData\Aternity\hooks"
msg1
msg2
msg3
msg4

但是,运行将 Python 代码与参数 auto_offset_reset='earliest' 结合使用将在第一时间打印消息。然后,它不会在第一个 运行?

之后打印任何消息
from kafka import KafkaConsumer, KafkaProducer
consumer = KafkaConsumer('quickstart-events', bootstrap_servers=['localhost:9092'], auto_offset_reset='smallest')
for msg in consumer:
    print(msg)

TL;DR

每次要从头阅读主题时,您需要提供一个新的 group.id,同时保持设置 auto_offset_reset='earliest':

KafkaConsumer('quickstart-events', bootstrap_servers=['localhost:9092'], auto_offset_reset='smallest', group_id='newGroup')

如果您的代码在第一次 运行ning 时打印输出但在随后的 运行s 中不再打印输出,并且您的问题在重新启动 Kafka(您的 PC)时也已解决,您是在 Kafka 中触及 Consumer Group 的概念。由于这是一个非常重要的概念,我强烈建议您熟悉它 here

应用程序的消费者组确保它不会读取消息两次。每个消费者都有一个消费者组名称(即使您可能不会直接在代码中看到)。 consumer Group的偏移位置存储在一个内部Kafka topic中。

现在运行重新启动Kafka后第一次调用代码,Kafka还不知道消费者组并应用auto_offset_reset配置中提供的策略。在您的情况下,它从 earliest 可用提交中读取。第二次 运行 您的代码时,它不需要查看此策略,因为它已经知道消费者并且不会允许消费者再次使用该消息。

因此,如果您重新启动 Kafka,消费者的这种内部知识也会消失,并且再次应用 auto_offset_reset 策略。

请记住,这更像是一种 hack,不应该经常在生产系统上这样做,因为 consumerGroups 会闲置。

作为 sid 说明:您 console-consumer 每次 运行 都会创建一个新的消费者组。设置“--from-beginning”只是确保 auto_offset_reset 设置为 'earliest'.