如果我将 pollTimeout 设置为 1000ms,Kafka Consumer 会收到多少条消息
How many messages in Kafka Consumer come if I set pollTimeout to 1000ms
使用 Spring 批处理实现 Kafka。开发了 Spring 引导应用程序,我的 Kafka 生产者正在不断地生产消息。我想分批处理这些消息。但是当我触发工作时,工作是连续 运行。所以我决定在 KafkaItemReader 中添加 pollTimeout。这样我就可以停止工作了。但是在触发 Job 时会有多少消息传入 Kafka。如果我将 pollTimeout 设置为 1000ms,我无法在 google 中找到 KafkaItemReader 中将收到多少消息。
提示会很有帮助
@豆子
KafkaItemReader item() { return new kafkaItemBuilder().partitions(0).consumerproperties(prop).name(“reader”).savedata(true) 复制代码.topic(名称).pollTimeout(Duration.ofMillis(1000).build()}
属性是超时,不是记录限制。
您可以针对 max.poll.records 以及启动和停止消费者之间的时间段进行一些计算,但这只是一个估计值,而不是一个确切的数字,因为轮询超时只是一个上限等待最大轮询记录数
如果您想以编程方式计算已处理消息的数量,我建议获取偏移差或对消耗的记录计数求和。
批处理是针对固定的数据集。如果您的主题是连续的事件流,那么 Spring 批处理作业对您来说不是一个好的选择,流式解决方案更合适。 Spring Batch 期望您的 ItemReader
到 return null
当数据源耗尽时,但在您的情况下,数据源永远不会耗尽,这就是您的工作永远不会完成的原因.
超时 属性 实际上会使 reader return null
如果在此期间没有收到消息。
使用 Spring 批处理实现 Kafka。开发了 Spring 引导应用程序,我的 Kafka 生产者正在不断地生产消息。我想分批处理这些消息。但是当我触发工作时,工作是连续 运行。所以我决定在 KafkaItemReader 中添加 pollTimeout。这样我就可以停止工作了。但是在触发 Job 时会有多少消息传入 Kafka。如果我将 pollTimeout 设置为 1000ms,我无法在 google 中找到 KafkaItemReader 中将收到多少消息。
提示会很有帮助
@豆子
KafkaItemReader
属性是超时,不是记录限制。
您可以针对 max.poll.records 以及启动和停止消费者之间的时间段进行一些计算,但这只是一个估计值,而不是一个确切的数字,因为轮询超时只是一个上限等待最大轮询记录数
如果您想以编程方式计算已处理消息的数量,我建议获取偏移差或对消耗的记录计数求和。
批处理是针对固定的数据集。如果您的主题是连续的事件流,那么 Spring 批处理作业对您来说不是一个好的选择,流式解决方案更合适。 Spring Batch 期望您的 ItemReader
到 return null
当数据源耗尽时,但在您的情况下,数据源永远不会耗尽,这就是您的工作永远不会完成的原因.
超时 属性 实际上会使 reader return null
如果在此期间没有收到消息。