Spring-kafka支持只执行一次SeekToTimestamp
Spring-kafka support for only executing SeekToTimestamp once
我的用例是根据时间戳设置消费者组偏移量。
为此,我在 ConsumerSeekAware 的 onPartitionsAssigned() 方法中使用了 ConsumerSeekCallback 的 seekToTimestamp 方法。
现在,当我启动我的应用程序时,它会寻找我指定的时间戳,但在重新平衡期间,它会再次寻找该时间戳。
我希望仅当 ConsumerGroup Offset 小于该特定时间戳的偏移量时才会发生这种情况,如果它大于该时间戳,则不应查找。
我们有什么办法可以做到这一点吗?Spring-Kafka 为新的 ConsumerGroup 提供了一些侦听器,因此当创建新的消费者组时,它将根据时间戳调用搜索,否则将使用现有的偏移量?
public class KafkaConsumer implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
long timestamp = 1623775969;
callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
}
只需在您的实现中添加一个布尔字段 (boolean seeksDone;
);求后置真,假时才求
不过,如果您在第一次重新平衡时只获得分区 1 和 3,而在下一次重新平衡时获得分区 1、2、3、4,您必须决定该怎么做。
当然,如果您只有一个应用程序实例,这不是问题。但是,如果您需要在首次分配分区时查找每个分区,则必须跟踪每个分区的状态。
我的用例是根据时间戳设置消费者组偏移量。 为此,我在 ConsumerSeekAware 的 onPartitionsAssigned() 方法中使用了 ConsumerSeekCallback 的 seekToTimestamp 方法。
现在,当我启动我的应用程序时,它会寻找我指定的时间戳,但在重新平衡期间,它会再次寻找该时间戳。
我希望仅当 ConsumerGroup Offset 小于该特定时间戳的偏移量时才会发生这种情况,如果它大于该时间戳,则不应查找。
我们有什么办法可以做到这一点吗?Spring-Kafka 为新的 ConsumerGroup 提供了一些侦听器,因此当创建新的消费者组时,它将根据时间戳调用搜索,否则将使用现有的偏移量?
public class KafkaConsumer implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
long timestamp = 1623775969;
callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
}
只需在您的实现中添加一个布尔字段 (boolean seeksDone;
);求后置真,假时才求
不过,如果您在第一次重新平衡时只获得分区 1 和 3,而在下一次重新平衡时获得分区 1、2、3、4,您必须决定该怎么做。
当然,如果您只有一个应用程序实例,这不是问题。但是,如果您需要在首次分配分区时查找每个分区,则必须跟踪每个分区的状态。