如何删除 Kafka 消费者组以重置偏移量?

How do I delete a Kafka Consumer Group to reset offsets?

我想删除一个Kakfa消费者组,这样当应用程序创建一个消费者并订阅一个主题时,它可以从主题数据的开头开始。

这是使用当前最新的 Confluent Platform 3.1.2 的单节点开发 vm,它使用 Kafka 0.10.1.1。

我试试正常的语法:

sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --delete --group my_consumer_group

我收到错误:

Option [delete] is only valid with [zookeeper]. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires.

如果我尝试 zookeeper 变体:

sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --delete --group my_consumer_group

我得到:

Delete for group my_consumer_group failed because group does not exist.

如果我使用 "old" 消费者列出,我看不到我的消费者组(或任何其他消费者组)

sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --list

如果我使用 "new" 消费者列表,我可以看到我的消费者组,但显然我无法删除它:

sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --list

使用 Kafka 0.10.2 升级到刚刚发布的 Confluent Platform 3.2 解决了我的根本问题。当我删除主题时,偏移量信息现在已正确重置。所以当我创建同名主题时,消费者从新数据的开头开始。

我仍然无法使用kafka-consumer-groups工具删除新样式的消费者组,但我的根本问题已解决。

在 Kafka 0.10.2 之前,有 hack,但没有干净的解决方案。

如果使用Java客户端,可以先获取起始偏移

TopicPartition partition = new TopicPartition("YOUR_TOPIC", YOUR_PARTITION);
Map<TopicPartition, Long> map = consumer.beginningOffsets(Collections.singleton(partition));

以及消费者用来开始处理的偏移量,(如果不删除消费者组)。

Long committedOffset = consumer.committed(partition).offset();

现在,如果您认为从 committedOffset 开始没问题,只需轮询记录即可。 如果你想要开始偏移量, consumer.seek(分区, map.get(分区));

在 Kafka 0.11(或 Confluent 3.3)中,您可以重置任何现有消费者组的偏移量,而无需删除主题。事实上,您可以将偏移量更改为任何绝对偏移值或时间戳或任何相对位置。

这些新功能都是通过 kafka-consumer-groups 命令行工具上的新 --reset-offsets 标志添加的。

在此处查看 KIP-122 详细信息https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

这可以用 Kafka 1 来完成。1.x。来自文档:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

你也可以在不删除整个消费者组的情况下重置单个主题的偏移量:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-earliest --execute