有没有办法通过 API 重置 Kafka 消费者组的偏移量?
Is there a way to reset offsets of a Kafka consumer group through an API?
我有一个用例,其中有一个消费组正在消费消息。我想构建一个 API 来修改它的偏移量。因此,当使用偏移量调用端点时,我必须更改消费者组的偏移量。我正在使用 SpringBoot 并且消费者是使用 Spring Kafka 构建的。
提前致谢。
这是通过 CLI 的解决方案:
列出群组订阅的主题:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe
记下“CURRENT-OFFSET”和“LOG-END-OFFSET”下的值。 “CURRENT-OFFSET”是该消费者组当前在每个分区中的偏移量。
重置主题的消费者偏移量(预览):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest
这将打印重置的预期结果,但实际上 运行 它不会。
重置主题的消费者偏移量(执行):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
这将执行重置并将指定主题的消费者组偏移量重置回 0。
重复1检查是否重置成功
Spring for Apache Kafka 提供了一些方便的机制来执行查找,无论是在应用程序初始化期间,还是在之后的任何时候。
最简单的方法是让您的侦听器扩展 AbstractConsumerSeekAware
或实现 ConsumerSeekAware
。
要重置偏移量,代码必须通过组 ID 获取消费者、启动和停止。必须创建一个客户端(当消费者停止时)。创建客户端后,代码必须连接、重置偏移量,然后断开连接(如下所示)。重置后,必须重新启动消费者组。
下面的代码重置偏移量。关键是让消费者实例能够停止和启动消费者,因为如果消费者是 运行,它就无法重置。我们从命令行得到这个 运行,这很好。希望这对您有所帮助。
return kafka.getConsumer( args.groupId )
.then(consumerInstance => {
consumer = consumerInstance;
return consumer.run();
}).then(()=> {
return consumer.stop();
}).then(() => {
// reset kafka client offset
})).then(()=> {
return consumer.run();
}).
// reset kafka client offset
const { groupId, topic } = args;
const admin = createKafkaClient().admin();
return admin.connect()
.then(() =>
admin.resetOffsets({
groupId,
topic,
earliest: true,
})
.then(() => admin.disconnect();
我有一个用例,其中有一个消费组正在消费消息。我想构建一个 API 来修改它的偏移量。因此,当使用偏移量调用端点时,我必须更改消费者组的偏移量。我正在使用 SpringBoot 并且消费者是使用 Spring Kafka 构建的。 提前致谢。
这是通过 CLI 的解决方案:
列出群组订阅的主题:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe
记下“CURRENT-OFFSET”和“LOG-END-OFFSET”下的值。 “CURRENT-OFFSET”是该消费者组当前在每个分区中的偏移量。
重置主题的消费者偏移量(预览):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest
这将打印重置的预期结果,但实际上 运行 它不会。
重置主题的消费者偏移量(执行):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
这将执行重置并将指定主题的消费者组偏移量重置回 0。
重复1检查是否重置成功
Spring for Apache Kafka 提供了一些方便的机制来执行查找,无论是在应用程序初始化期间,还是在之后的任何时候。
最简单的方法是让您的侦听器扩展 AbstractConsumerSeekAware
或实现 ConsumerSeekAware
。
要重置偏移量,代码必须通过组 ID 获取消费者、启动和停止。必须创建一个客户端(当消费者停止时)。创建客户端后,代码必须连接、重置偏移量,然后断开连接(如下所示)。重置后,必须重新启动消费者组。
下面的代码重置偏移量。关键是让消费者实例能够停止和启动消费者,因为如果消费者是 运行,它就无法重置。我们从命令行得到这个 运行,这很好。希望这对您有所帮助。
return kafka.getConsumer( args.groupId )
.then(consumerInstance => {
consumer = consumerInstance;
return consumer.run();
}).then(()=> {
return consumer.stop();
}).then(() => {
// reset kafka client offset
})).then(()=> {
return consumer.run();
}).
// reset kafka client offset
const { groupId, topic } = args;
const admin = createKafkaClient().admin();
return admin.connect()
.then(() =>
admin.resetOffsets({
groupId,
topic,
earliest: true,
})
.then(() => admin.disconnect();