如果我将 pollTimeout 设置为 1000ms,Kafka Consumer 会收到多少条消息

How many messages in Kafka Consumer come if I set pollTimeout to 1000ms

使用 Spring 批处理实现 Kafka。开发了 Spring 引导应用程序,我的 Kafka 生产者正在不断地生产消息。我想分批处理这些消息。但是当我触发工作时,工作是连续 运行。所以我决定在 KafkaItemReader 中添加 pollTimeout。这样我就可以停止工作了。但是在触发 Job 时会有多少消息传入 Kafka。如果我将 pollTimeout 设置为 1000ms,我无法在 google 中找到 KafkaItemReader 中将收到多少消息。

提示会很有帮助

@豆子 KafkaItemReader item() { return new kafkaItemBuilder().partitions(0).consumerproperties(prop).name(“reader”).savedata(true) 复制代码.topic(名称).pollTimeout(Duration.ofMillis(1000).build()}

属性是超时,不是记录限制。

您可以针对 max.poll.records 以及启动和停止消费者之间的时间段进行一些计算,但这只是一个估计值,而不是一个确切的数字,因为轮询超时只是一个上限等待最大轮询记录数

如果您想以编程方式计算已处理消息的数量,我建议获取偏移差或对消耗的记录计数求和。

批处理是针对固定的数据集。如果您的主题是连续的事件流,那么 Spring 批处理作业对您来说不是一个好的选择,流式解决方案更合适。 Spring Batch 期望您的 ItemReader 到 return null 当数据源耗尽时,但在您的情况下,数据源永远不会耗尽,这就是您的工作永远不会完成的原因.

超时 属性 实际上会使 reader return null 如果在此期间没有收到消息。