Kafka 的 session.timeout.ms 和 max.poll.interval.ms 之间的区别

Difference between session.timeout.ms and max.poll.interval.ms for Kafka

AFAIK,max.poll.interval.ms 是在 Kafka 0.10.1 中引入的。然而,目前还不清楚什么时候我们可以同时使用 session.timeout.ms 和 max.poll.interval.ms 考虑心跳线程没有响应的用例,但我的处理线程因为它设置了更高的值,它仍在处理记录。但是当心跳线程在越过 session.timeout.ms 后关闭,到底发生了什么。因为我在 POC 中观察到消费者重新平衡在达到 max.poll.interval.ms.
之前不会发生 所以对我来说 session.timeout.ms 似乎是多余的。
类似的 已发布,但它没有回答这个问题。

session.timeout.ms用于通过心跳机制检测消费者故障。消费者心跳线程必须在 session.timeout.ms 时间到期之前向代理发送心跳。否则消费者被 Kafka 视为死亡并触发重新平衡。

heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group.

session.timeout.ms: The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.

轮询是检查消费者健康状况的另一种机制。消费者应该在 max.poll.interval.ms 到期之前调用 poll() 方法。如果这个时间到期(通常较长的 运行 过程会导致此问题)再次将消费者视为死亡并触发重新平衡。

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

其他重要的一点是(从版本 0.10.1.0 开始):

rebalance.timeout = max.poll.interval.ms

Since we give the client as much as max.poll.interval.ms to handle a batch of records, this is also the maximum time before a consumer can be expected to rejoin the group in the worst case. We therefore propose to set the rebalance timeout in the Java client to the same value configured with max.poll.interval.ms. When a rebalance begins, the background thread will continue sending heartbeats. The consumer will not rejoin the group until processing completes and the user calls poll(). From the coordinator's perspective, the consumer will not be removed from the group until either 1) their session timeout expires without receiving a heartbeat, or 2) the rebalance timeout expires.

因此,在您的情况下,如果 session.timeout.ms 过期且消费者没有心跳,则将在该消费者组中启动重新平衡。重新平衡开始后,消费者组中的所有消费者都被撤销,Kafka 等待所有仍在向 poll() 发送心跳的消费者(通过轮询消费者在此时发送 joinGroupRequest)直到重新平衡超时到期,这等于 max.poll.interval.ms .

在重新平衡期间,您仍然可以处理您已经拥有但无法提交的消息,并使用此消息获得 CommitFailedException

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

有关更多信息,您可以查看 this