Kafka 消费者的第一个轮询不检索主题消息。有什么问题吗?

Kafka consumer's first poll does not retrieve the topic messages. What could be wrong?

我在 Docker 本地安装了一个简单的 Kafka 2.4.1 (Confluent 5.4.1) 运行。我使用了一个测试生产者和一个用Java编写的测试消费者。该代码在 GitHub.

中可用

单元测试做:

问题是:消费者的第一个 运行 将跳过主题中可用的已生成消息。 真正的问题是那些错过的消息丢失了(从消费者的角度来看:偏移量被移动到主题中的最新并且滞后为0 < - 这在Kafka Tool)

第一个 运行 之后的结果是:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:26:51 CET 2020
Offset: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.686 sec
Running com.example.TestKafkaConsumer
Record count: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.561 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

第二次 运行 测试给出:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:28:08 CET 2020
Offset: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.538 sec
Running com.example.TestKafkaConsumer
Record count: 1
offset = 1, key = static-key, value = this is the string message at Thu Mar 26 10:28:08 CET 2020
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

我尝试修改不同的变体,结果总是一样的:

有时我还观察到第二个 运行 的消费者仍然错过了生成的事件。

查看您 GitHub 存储库中的代码,您似乎没有设置消费者配置 auto.offset.reset。根据 documentation 此设置默认为 latest。这意味着如果您的测试主题的 Broker 不知道消费者组,它只会等待新的传入消息。因此,TestConsumer 无法使用您的 Producer Test 之前 写入的消息。

这篇 documentation 很好地解释了 Kafka 中的消费者组概念。