Kafka 消费者的第一个轮询不检索主题消息。有什么问题吗?
Kafka consumer's first poll does not retrieve the topic messages. What could be wrong?
我在 Docker 本地安装了一个简单的 Kafka 2.4.1 (Confluent 5.4.1) 运行。我使用了一个测试生产者和一个用Java编写的测试消费者。该代码在 GitHub.
中可用
单元测试做:
- 生产者向单分区主题生产一条消息
- 消费者订阅主题并轮询 Kafka 以获取可用消息
问题是:消费者的第一个 运行 将跳过主题中可用的已生成消息。 真正的问题是那些错过的消息丢失了(从消费者的角度来看:偏移量被移动到主题中的最新并且滞后为0 < - 这在Kafka Tool)
第一个 运行 之后的结果是:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:26:51 CET 2020
Offset: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.686 sec
Running com.example.TestKafkaConsumer
Record count: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.561 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
第二次 运行 测试给出:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:28:08 CET 2020
Offset: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.538 sec
Running com.example.TestKafkaConsumer
Record count: 1
offset = 1, key = static-key, value = this is the string message at Thu Mar 26 10:28:08 CET 2020
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
我尝试修改不同的变体,结果总是一样的:
- 在生成第一条消息之前等待 Kafka "warm up"
- 在生产和消费之间等待一段时间
- 在第一次消费前产生多条消息
- 未经 Kafka 工具检查而生产和消费(以避免任何第 3 方未知干扰)
有时我还观察到第二个 运行 的消费者仍然错过了生成的事件。
查看您 GitHub 存储库中的代码,您似乎没有设置消费者配置 auto.offset.reset
。根据 documentation 此设置默认为 latest
。这意味着如果您的测试主题的 Broker 不知道消费者组,它只会等待新的传入消息。因此,TestConsumer 无法使用您的 Producer Test 之前 写入的消息。
这篇 documentation 很好地解释了 Kafka 中的消费者组概念。
我在 Docker 本地安装了一个简单的 Kafka 2.4.1 (Confluent 5.4.1) 运行。我使用了一个测试生产者和一个用Java编写的测试消费者。该代码在 GitHub.
中可用单元测试做:
- 生产者向单分区主题生产一条消息
- 消费者订阅主题并轮询 Kafka 以获取可用消息
问题是:消费者的第一个 运行 将跳过主题中可用的已生成消息。 真正的问题是那些错过的消息丢失了(从消费者的角度来看:偏移量被移动到主题中的最新并且滞后为0 < - 这在Kafka Tool)
第一个 运行 之后的结果是:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:26:51 CET 2020
Offset: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.686 sec
Running com.example.TestKafkaConsumer
Record count: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.561 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
第二次 运行 测试给出:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:28:08 CET 2020
Offset: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.538 sec
Running com.example.TestKafkaConsumer
Record count: 1
offset = 1, key = static-key, value = this is the string message at Thu Mar 26 10:28:08 CET 2020
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
我尝试修改不同的变体,结果总是一样的:
- 在生成第一条消息之前等待 Kafka "warm up"
- 在生产和消费之间等待一段时间
- 在第一次消费前产生多条消息
- 未经 Kafka 工具检查而生产和消费(以避免任何第 3 方未知干扰)
有时我还观察到第二个 运行 的消费者仍然错过了生成的事件。
查看您 GitHub 存储库中的代码,您似乎没有设置消费者配置 auto.offset.reset
。根据 documentation 此设置默认为 latest
。这意味着如果您的测试主题的 Broker 不知道消费者组,它只会等待新的传入消息。因此,TestConsumer 无法使用您的 Producer Test 之前 写入的消息。
这篇 documentation 很好地解释了 Kafka 中的消费者组概念。