有没有办法通过 API 重置 Kafka 消费者组的偏移量?

Is there a way to reset offsets of a Kafka consumer group through an API?

我有一个用例,其中有一个消费组正在消费消息。我想构建一个 API 来修改它的偏移量。因此,当使用偏移量调用端点时,我必须更改消费者组的偏移量。我正在使用 SpringBoot 并且消费者是使用 Spring Kafka 构建的。 提前致谢。

这是通过 CLI 的解决方案:

列出群组订阅的主题:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe

记下“CURRENT-OFFSET”和“LOG-END-OFFSET”下的值。 “CURRENT-OFFSET”是该消费者组当前在每个分区中的偏移量。

重置主题的消费者偏移量(预览):

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest

这将打印重置的预期结果,但实际上 运行 它不会。

重置主题的消费者偏移量(执行):

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

这将执行重置并将指定主题的消费者组偏移量重置回 0。

重复1检查是否重置成功

Spring for Apache Kafka 提供了一些方便的机制来执行查找,无论是在应用程序初始化期间,还是在之后的任何时候。

最简单的方法是让您的侦听器扩展 AbstractConsumerSeekAware 或实现 ConsumerSeekAware

要重置偏移量,代码必须通过组 ID 获取消费者、启动和停止。必须创建一个客户端(当消费者停止时)。创建客户端后,代码必须连接、重置偏移量,然后断开连接(如下所示)。重置后,必须重新启动消费者组。

下面的代码重置偏移量。关键是让消费者实例能够停止和启动消费者,因为如果消费者是 运行,它就无法重置。我们从命令行得到这个 运行,这很好。希望这对您有所帮助。

return kafka.getConsumer( args.groupId )
    .then(consumerInstance => {
        consumer = consumerInstance;
        return consumer.run();
    }).then(()=> {
        return consumer.stop();
    }).then(() => {
         // reset kafka client offset
    })).then(()=> {
        return consumer.run();
    }).

// reset kafka client offset
const { groupId, topic } = args;

const admin = createKafkaClient().admin();

return admin.connect()
    .then(() =>
        admin.resetOffsets({
            groupId,
            topic,
            earliest: true,
    })
    .then(() => admin.disconnect();