如何重置 Kafka 偏移量以匹配尾部位置?
How to reset Kafka offsets to match tail position?
我们将 Storm 与 Kafka 和 ZooKeeper 结合使用。我们遇到过这样一种情况,我们不得不删除一些主题并用不同的名称重新创建它们。我们的 Kafka spouts 保持不变,除了现在从新的主题名称中读取。然而,现在 spouts 在尝试读取新主题时使用旧主题分区的偏移量。因此,my-topic-name 分区 0 的尾部位置将为 500,但偏移量将类似于 10000。
有没有办法重新设置偏移位置使其与主题的尾部匹配?
有多个选项(因为 Storm 的 KafkaSpout
没有提供任何 API 来定义起始偏移量)。
- 如果你想从日志的尾部消费你应该删除旧的偏移量
- 取决于你的 Kafka 版本
- (pre 0.9) 你可以操纵 ZK(这有点棘手)
- (0.9+) 或者您尝试从主题
__consumer_offsets
中删除偏移量(这也很棘手,可能也会删除您想要保留的其他偏移量)
- 如果没有偏移量,您可以使用自动偏移量重置策略 "latest" 或 "largest"(取决于您的 Kafka 版本)重新启动 spout
- 作为替代方案(我会推荐),您可以编写一个小型客户端应用程序,使用
seek()
以您需要的方式操纵偏移量和 commit()
偏移量。此客户端必须使用与您 KafkaSpout
相同的组 ID,并且必须订阅相同的主题。此外,您需要确保此客户端应用程序是 运行 单个消费者组成员,以便分配所有分区。
- 为此,您可以找到日志的末尾并提交
- 或者您提交了一个无效的偏移量(如 -1)并依赖自动偏移量重置配置"latest" 或 "largest"(取决于您的 Kafka 版本)
对于 Kafka Streams,有一个 "Application Reset Tool" 可以做类似的事情来操纵提交的偏移量。如果你想了解一些细节,你可以阅读这篇博客 post http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
(免责声明:我是 post 的作者,它是关于 Kafka Streams 的——尽管如此,底层的偏移操作思想是相同的)
我们将 Storm 与 Kafka 和 ZooKeeper 结合使用。我们遇到过这样一种情况,我们不得不删除一些主题并用不同的名称重新创建它们。我们的 Kafka spouts 保持不变,除了现在从新的主题名称中读取。然而,现在 spouts 在尝试读取新主题时使用旧主题分区的偏移量。因此,my-topic-name 分区 0 的尾部位置将为 500,但偏移量将类似于 10000。
有没有办法重新设置偏移位置使其与主题的尾部匹配?
有多个选项(因为 Storm 的 KafkaSpout
没有提供任何 API 来定义起始偏移量)。
- 如果你想从日志的尾部消费你应该删除旧的偏移量
- 取决于你的 Kafka 版本
- (pre 0.9) 你可以操纵 ZK(这有点棘手)
- (0.9+) 或者您尝试从主题
__consumer_offsets
中删除偏移量(这也很棘手,可能也会删除您想要保留的其他偏移量)
- 如果没有偏移量,您可以使用自动偏移量重置策略 "latest" 或 "largest"(取决于您的 Kafka 版本)重新启动 spout
- 取决于你的 Kafka 版本
- 作为替代方案(我会推荐),您可以编写一个小型客户端应用程序,使用
seek()
以您需要的方式操纵偏移量和commit()
偏移量。此客户端必须使用与您KafkaSpout
相同的组 ID,并且必须订阅相同的主题。此外,您需要确保此客户端应用程序是 运行 单个消费者组成员,以便分配所有分区。- 为此,您可以找到日志的末尾并提交
- 或者您提交了一个无效的偏移量(如 -1)并依赖自动偏移量重置配置"latest" 或 "largest"(取决于您的 Kafka 版本)
对于 Kafka Streams,有一个 "Application Reset Tool" 可以做类似的事情来操纵提交的偏移量。如果你想了解一些细节,你可以阅读这篇博客 post http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
(免责声明:我是 post 的作者,它是关于 Kafka Streams 的——尽管如此,底层的偏移操作思想是相同的)