我们应该使用 max.poll.records 还是 max.poll.interval.ms 来处理在 kafka 消费者中处理时间更长的记录?

Should we use max.poll.records or max.poll.interval.ms to handle records that take longer to process in kafka consumer?

我想了解在 kafka 消费者中处理需要更长时间处理的记录的更好选择是什么?我 运行 很少测试来理解这一点,并观察到我们可以通过修改 max.poll.recordsmax.poll.interval.ms.

来控制它

现在我的问题是,选择什么更好?请提出建议。

max.poll.records 简单地定义了单次调用 poll().

返回的最大记录数

现在 max.poll.interval.ms 定义了调用 poll() 之间的延迟。

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown.


我相信您可以调整两者以获得预期的行为。例如,您可以计算消息的平均处理时间。如果平均处理时间为 1 秒,而您有 max.poll.records=100,那么您应该为轮询间隔留出大约 100 秒以上的时间。

如果您的处理速度很慢,因此想避免重新平衡,那么调整两者都可以实现。但是,扩展 max.poll.interval.ms 以允许轮询之间的间隔更长确实会产生一些副作用。

每个消费者只使用2个线程——轮询线程和心跳线程。

后者让组知道您的应用程序仍然存在,因此可以在 max.poll.interval.ms 到期之前触发重新平衡,它还会在您处理之前轮询的批次时预取记录。

轮询线程在组通信方面完成所有其他工作,因此在轮询方法期间,您会发现其他地方是否触发了重新平衡,您会发现分区领导者是否已经死亡,因此需要刷新元数据。这意味着如果你允许轮询之间有更长的间隔,那么整个组对变化的响应速度就会变慢(例如,在重新平衡之后没有消费者开始接收消息,直到他们都收到了他们的新分区 - 如果重新平衡发生在一个之后消费者已经开始处理一个批次 10 分钟,那么所有消费者将至少停留那么长时间。

因此,对于消息处理速度较慢的情况下响应更快的组,您应该选择减少每批中提取的记录。