Spring-kafka支持只执行一次SeekToTimestamp

Spring-kafka support for only executing SeekToTimestamp once

我的用例是根据时间戳设置消费者组偏移量。 为此,我在 ConsumerSeekAware 的 onPartitionsAssigned() 方法中使用了 ConsumerSeekCallback 的 seekToTimestamp 方法。

现在,当我启动我的应用程序时,它会寻找我指定的时间戳,但在重新平衡期间,它会再次寻找该时间戳。

我希望仅当 ConsumerGroup Offset 小于该特定时间戳的偏移量时才会发生这种情况,如果它大于该时间戳,则不应查找。

我们有什么办法可以做到这一点吗?Spring-Kafka 为新的 ConsumerGroup 提供了一些侦听器,因此当创建新的消费者组时,它将根据时间戳调用搜索,否则将使用现有的偏移量?

public class KafkaConsumer implements ConsumerSeekAware {

@Override
  public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
 long timestamp = 1623775969;
    callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
}

只需在您的实现中添加一个布尔字段 (boolean seeksDone;);求后置真,假时才求

不过,如果您在第一次重新平衡时只获得分区 1 和 3,而在下一次重新平衡时获得分区 1、2、3、4,您必须决定该怎么做。

当然,如果您只有一个应用程序实例,这不是问题。但是,如果您需要在首次分配分区时查找每个分区,则必须跟踪每个分区的状态。