如何删除 Kafka 消费者组以重置偏移量?
How do I delete a Kafka Consumer Group to reset offsets?
我想删除一个Kakfa消费者组,这样当应用程序创建一个消费者并订阅一个主题时,它可以从主题数据的开头开始。
这是使用当前最新的 Confluent Platform 3.1.2 的单节点开发 vm,它使用 Kafka 0.10.1.1。
我试试正常的语法:
sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --delete --group my_consumer_group
我收到错误:
Option [delete] is only valid with [zookeeper]. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires.
如果我尝试 zookeeper 变体:
sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --delete --group my_consumer_group
我得到:
Delete for group my_consumer_group failed because group does not exist.
如果我使用 "old" 消费者列出,我看不到我的消费者组(或任何其他消费者组)
sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --list
如果我使用 "new" 消费者列表,我可以看到我的消费者组,但显然我无法删除它:
sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --list
使用 Kafka 0.10.2 升级到刚刚发布的 Confluent Platform 3.2 解决了我的根本问题。当我删除主题时,偏移量信息现在已正确重置。所以当我创建同名主题时,消费者从新数据的开头开始。
我仍然无法使用kafka-consumer-groups
工具删除新样式的消费者组,但我的根本问题已解决。
在 Kafka 0.10.2 之前,有 hack,但没有干净的解决方案。
如果使用Java客户端,可以先获取起始偏移
TopicPartition partition = new TopicPartition("YOUR_TOPIC", YOUR_PARTITION);
Map<TopicPartition, Long> map = consumer.beginningOffsets(Collections.singleton(partition));
以及消费者用来开始处理的偏移量,(如果不删除消费者组)。
Long committedOffset = consumer.committed(partition).offset();
现在,如果您认为从 committedOffset
开始没问题,只需轮询记录即可。
如果你想要开始偏移量,
consumer.seek(分区, map.get(分区));
在 Kafka 0.11(或 Confluent 3.3)中,您可以重置任何现有消费者组的偏移量,而无需删除主题。事实上,您可以将偏移量更改为任何绝对偏移值或时间戳或任何相对位置。
这些新功能都是通过 kafka-consumer-groups
命令行工具上的新 --reset-offsets
标志添加的。
在此处查看 KIP-122 详细信息https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
这可以用 Kafka 1 来完成。1.x。来自文档:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
你也可以在不删除整个消费者组的情况下重置单个主题的偏移量:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-earliest --execute
我想删除一个Kakfa消费者组,这样当应用程序创建一个消费者并订阅一个主题时,它可以从主题数据的开头开始。
这是使用当前最新的 Confluent Platform 3.1.2 的单节点开发 vm,它使用 Kafka 0.10.1.1。
我试试正常的语法:
sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --delete --group my_consumer_group
我收到错误:
Option [delete] is only valid with [zookeeper]. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires.
如果我尝试 zookeeper 变体:
sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --delete --group my_consumer_group
我得到:
Delete for group my_consumer_group failed because group does not exist.
如果我使用 "old" 消费者列出,我看不到我的消费者组(或任何其他消费者组)
sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --list
如果我使用 "new" 消费者列表,我可以看到我的消费者组,但显然我无法删除它:
sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --list
使用 Kafka 0.10.2 升级到刚刚发布的 Confluent Platform 3.2 解决了我的根本问题。当我删除主题时,偏移量信息现在已正确重置。所以当我创建同名主题时,消费者从新数据的开头开始。
我仍然无法使用kafka-consumer-groups
工具删除新样式的消费者组,但我的根本问题已解决。
在 Kafka 0.10.2 之前,有 hack,但没有干净的解决方案。
如果使用Java客户端,可以先获取起始偏移
TopicPartition partition = new TopicPartition("YOUR_TOPIC", YOUR_PARTITION);
Map<TopicPartition, Long> map = consumer.beginningOffsets(Collections.singleton(partition));
以及消费者用来开始处理的偏移量,(如果不删除消费者组)。
Long committedOffset = consumer.committed(partition).offset();
现在,如果您认为从 committedOffset
开始没问题,只需轮询记录即可。
如果你想要开始偏移量,
consumer.seek(分区, map.get(分区));
在 Kafka 0.11(或 Confluent 3.3)中,您可以重置任何现有消费者组的偏移量,而无需删除主题。事实上,您可以将偏移量更改为任何绝对偏移值或时间戳或任何相对位置。
这些新功能都是通过 kafka-consumer-groups
命令行工具上的新 --reset-offsets
标志添加的。
在此处查看 KIP-122 详细信息https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
这可以用 Kafka 1 来完成。1.x。来自文档:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
你也可以在不删除整个消费者组的情况下重置单个主题的偏移量:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-earliest --execute