When/why kafka分区可以unpause "by itself"吗?

When/why can kafka partition unpause "by itself"?

我有这个卡夫卡消费者:

new ReactiveKafkaConsumerTemplate<>(createReceivingOptions())

它愉快地处理消息,我设置

max-poll-records=1

这样事情就不会对我来说太快了。我可以在

上的 poll 方法中通过记录断点进行验证
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

轮询返回了多少条记录,是的,是一条。然后我要求它暂停所有分配的分区。在日志中可以看到它起作用了!

o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=Testing, groupId=Testing] Pausing partitions [TestTopic-0]

从那时起我可以看到,该轮询仅获得 0 条记录以及此日志:

Skipping fetching records for assigned partition TestTopic-0 because it is paused

好的,很好,很有效!但是等等,为什么我的整个主题都在处理?

然后我发现,在某个时候还有这个日志:

Consumer clientId=Testing, groupId=Testing] Resuming partitions [TestTopic-0]

什么?谁在打电话?然后我还发现,到处都有多个暂停请求,而不仅仅是我实际调用的那个。

暂停以某种方式被反应式使用并且不能手动使用?或者有人解释为什么 …clients.consumer.KafkaConsumer 一直在自己做 pause/resume 主题,并因此而手动暂停吗?

查看 ConsumerEventLoop 代码后,反应式客户端在内部使用 pause/resume 来处理背压 - 当下游无法接收更多数据时,他会暂停所有分配的分区并无条件恢复当背压减轻时。

在我看来,它需要跟踪是否由于背压而暂停,并且只有在这种情况下才恢复。

它好像在 this commit 之前就这样做过。

也许您可以使用背压来强制暂停?