When/why kafka分区可以unpause "by itself"吗?
When/why can kafka partition unpause "by itself"?
我有这个卡夫卡消费者:
new ReactiveKafkaConsumerTemplate<>(createReceivingOptions())
它愉快地处理消息,我设置
max-poll-records=1
这样事情就不会对我来说太快了。我可以在
上的 poll
方法中通过记录断点进行验证
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
轮询返回了多少条记录,是的,是一条。然后我要求它暂停所有分配的分区。在日志中可以看到它起作用了!
o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=Testing, groupId=Testing] Pausing partitions [TestTopic-0]
从那时起我可以看到,该轮询仅获得 0 条记录以及此日志:
Skipping fetching records for assigned partition TestTopic-0 because it is paused
好的,很好,很有效!但是等等,为什么我的整个主题都在处理?
然后我发现,在某个时候还有这个日志:
Consumer clientId=Testing, groupId=Testing] Resuming partitions [TestTopic-0]
什么?谁在打电话?然后我还发现,到处都有多个暂停请求,而不仅仅是我实际调用的那个。
暂停以某种方式被反应式使用并且不能手动使用?或者有人解释为什么 …clients.consumer.KafkaConsumer
一直在自己做 pause/resume 主题,并因此而手动暂停吗?
查看 ConsumerEventLoop
代码后,反应式客户端在内部使用 pause/resume
来处理背压 - 当下游无法接收更多数据时,他会暂停所有分配的分区并无条件恢复当背压减轻时。
在我看来,它需要跟踪是否由于背压而暂停,并且只有在这种情况下才恢复。
它好像在 this commit 之前就这样做过。
也许您可以使用背压来强制暂停?
我有这个卡夫卡消费者:
new ReactiveKafkaConsumerTemplate<>(createReceivingOptions())
它愉快地处理消息,我设置
max-poll-records=1
这样事情就不会对我来说太快了。我可以在
上的poll
方法中通过记录断点进行验证
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
轮询返回了多少条记录,是的,是一条。然后我要求它暂停所有分配的分区。在日志中可以看到它起作用了!
o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=Testing, groupId=Testing] Pausing partitions [TestTopic-0]
从那时起我可以看到,该轮询仅获得 0 条记录以及此日志:
Skipping fetching records for assigned partition TestTopic-0 because it is paused
好的,很好,很有效!但是等等,为什么我的整个主题都在处理?
然后我发现,在某个时候还有这个日志:
Consumer clientId=Testing, groupId=Testing] Resuming partitions [TestTopic-0]
什么?谁在打电话?然后我还发现,到处都有多个暂停请求,而不仅仅是我实际调用的那个。
暂停以某种方式被反应式使用并且不能手动使用?或者有人解释为什么 …clients.consumer.KafkaConsumer
一直在自己做 pause/resume 主题,并因此而手动暂停吗?
查看 ConsumerEventLoop
代码后,反应式客户端在内部使用 pause/resume
来处理背压 - 当下游无法接收更多数据时,他会暂停所有分配的分区并无条件恢复当背压减轻时。
在我看来,它需要跟踪是否由于背压而暂停,并且只有在这种情况下才恢复。
它好像在 this commit 之前就这样做过。
也许您可以使用背压来强制暂停?