kafka-node 消费者收到 offsetOutOfRange 错误

kafka-node consumer receives offsetOutOfRange error

我正在使用 kafka-node(kafka 的节点客户端),使用消费者检索有关主题的消息。不幸的是,我收到了 "offsetOutOfRange" 条件(调用了 offsetOutOfRange 回调)。我的应用程序运行良好,直到消费者明显落后于生产者,在最早和最新的偏移量之间留下了很大的差距。在这一点上,我(可能是错误的)假设消费者将能够继续接收消息(并希望赶上生产者)。

我的kafka消费者客户端代码如下:

:
:
var kafka = require('kafka-node');

var zookeeper = "10.0.1.201:2181";
var id = "embClient";

var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper, id);
var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } );

consumer.on('error', [error callback...]);

consumer.on('offsetOutOfRange', [offset error callback...]);

consumer.on('message', [message callback...]);
:
:

我是不是做错了什么,还是遗漏了什么?

如果没有,我有几个问题:

(a) 是否有公认的 "best" 方式编写客户端以优雅地处理这种情况?

(b) 为什么会提出这个条件? (我假设客户端应该能够从中断的地方继续阅读消息,最终(理想情况下)赶上...)

(c) 我是否需要编写 code/logic 来处理这种情况,并显式重新定位要读取的消费者偏移量? (这个好像有点麻烦)...

感谢任何帮助。

我认为该应用可能会尝试读取 Kafka 中不再可用的消息。 Kafka 根据 log.retention.* 属性删除旧消息。假设您已向 Kafka 发送了 1000 条消息。由于保留,Kafka 删除了前 500 条消息。如果您的应用程序尝试读取消息 350,它将失败并引发 offsetOutOfRange 错误。这可能是因为您的消费者速度太慢,以至于 Kafka 代理在您的消费者可以处理消息之前就已经删除了消息。或者您的消费者崩溃了,但最后处理的消息的偏移量保存在某处。

您可以使用 Offset class 检索 latest/earliest 可用偏移量(参见方法 fetch)并更新消费者的偏移量。我们使用这种方法。

一般来说,当这种情况发生时,很难判断应用程序应该做什么,因为很明显有些地方出了问题。

希望对您有所帮助, 卢卡斯