Kafka Storm Spout:获取偏移量超出范围的获取请求

Kafka Storm Spout: Got fetch request with offset out of range

我们的 Storm 拓扑中有一个场景,其中 KafkaSpouts 无法使用来自主题的任何消息。 Spout 连续记录相同的 WARN 消息:

Got fetch request with offset out of range

...
2016-10-26 11:11:31.070 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.078 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.084 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.098 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.104 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.111 o.a.s.k.KafkaUtils [WARN] Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch request with offset out of range: [3]
...

Spout 配置为从 zookeeper 读取最后提交的偏移量,并且在这种情况下该偏移量大于 Kafka 中的最新消息偏移量。我们也在调查主题偏移量重置的原因。

目前我们通过观察风暴日志中的超出范围警告来解决问题,删除 zookeeper 偏移条目,然后重新部署拓扑。

如果提交了无效的偏移量,则使用客户端配置 "auto.offset.reset"。它接受值 "smallest" 和 "largest"。如果未设置该值,则会抛出异常(如您的情况)。

对于 KafkaSpout,您可以通过变量 KafkaConfig#startOffsetTime 设置此值,将其设置为 kafka.api.OffsetRequest.EarliestTime()kafka.api.OffsetRequest.LatestTime()

http://storm.apache.org/releases/1.0.2/storm-kafka.html

就我而言,发生这种情况是因为我重新创建了我的 KafkaSpout 订阅的 Kafka 主题。

特定分区的偏移量保存在 Zookeeper 中,如果删除主题然后再次创建,您将必须从 Zookeeper 中手动删除偏移量信息。

只需打开Zookeeper CLI,然后删除属于您KafkaSpout 的消费者'group-id' 的'node' 所在的路径。 如需帮助,请参考 https://www.tutorialspoint.com/zookeeper/zookeeper_cli.htm