模拟 Kafka CommitFailedException
Simulate Kafka CommitFailedException
我正在尝试模拟 Kafka 抛出的 CommitFailedException。
我手动将 "session.timeout.ms" 设置为 10000 毫秒,将 "enable.auto.commit" 设置为 false。
之后,Kafkaconsumer.poll(),我有语句,Thread.sleep(12000),之后我进行提交。我希望由于线程在下一次轮询之前需要 12 秒,消费者应该被标记为已死并且应该抛出 CommitFailedException。但是,该过程执行顺利。
如何模拟 KafkaConsumer 抛出的异常。
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
try {
Thread.sleep(12000);
}catch (Exception e){
e.printStackTrace();
}
consumer.commitSync();
}
Kafka 通过 单独的线程 使用心跳机制来检查消费者的健康状况。消费者心跳线程必须在 session.timeout.ms
时间到期之前向代理发送心跳。
heartbeat.interval.ms: The expected time between heartbeats to the
consumer coordinator when using Kafka's group management facilities.
Heartbeats are used to ensure that the consumer's session stays active
and to facilitate rebalancing when new consumers join or leave the
group.
session.timeout.ms: The timeout used to detect client failures when
using Kafka's group management facility. The client sends periodic
heartbeats to indicate its liveness to the broker. If no heartbeats
are received by the broker before the expiration of this session
timeout, then the broker will remove this client from the group and
initiate a rebalance.
另一种检查消费者活跃度的机制是轮询。消费者应该在 max.poll.interval.ms
到期之前进行 poll()。如果这个时间到期(通常长 运行 进程导致这个问题)再次消费者被认为是死的。
max.poll.interval.ms: The maximum delay between invocations of poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member.
如果由于 session.timeout.ms
中没有心跳或 max.poll.interval.ms
中没有轮询,Kafka 认为消费者已死亡,消费者无法提交消息并获取 CommitFailedException
.
CommitFailedException: This exception is raised when an offset commit with KafkaConsumer.commitSync() fails with an unrecoverable
error. This can happen when a group rebalance completes before the
commit could be successfully applied. In this case, the commit cannot
generally be retried because some of the partitions may have already
been assigned to another member in the group.
因此;因为心跳线程是一个单独的线程,所以代码中的睡眠不会影响它。但在您的情况下,您可以将 max.poll.interval.ms
设置为 10 秒以获得 CommitFailedException
.
我正在尝试模拟 Kafka 抛出的 CommitFailedException。
我手动将 "session.timeout.ms" 设置为 10000 毫秒,将 "enable.auto.commit" 设置为 false。
之后,Kafkaconsumer.poll(),我有语句,Thread.sleep(12000),之后我进行提交。我希望由于线程在下一次轮询之前需要 12 秒,消费者应该被标记为已死并且应该抛出 CommitFailedException。但是,该过程执行顺利。
如何模拟 KafkaConsumer 抛出的异常。
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
try {
Thread.sleep(12000);
}catch (Exception e){
e.printStackTrace();
}
consumer.commitSync();
}
Kafka 通过 单独的线程 使用心跳机制来检查消费者的健康状况。消费者心跳线程必须在 session.timeout.ms
时间到期之前向代理发送心跳。
heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group.
session.timeout.ms: The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.
另一种检查消费者活跃度的机制是轮询。消费者应该在 max.poll.interval.ms
到期之前进行 poll()。如果这个时间到期(通常长 运行 进程导致这个问题)再次消费者被认为是死的。
max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
如果由于 session.timeout.ms
中没有心跳或 max.poll.interval.ms
中没有轮询,Kafka 认为消费者已死亡,消费者无法提交消息并获取 CommitFailedException
.
CommitFailedException: This exception is raised when an offset commit with KafkaConsumer.commitSync() fails with an unrecoverable error. This can happen when a group rebalance completes before the commit could be successfully applied. In this case, the commit cannot generally be retried because some of the partitions may have already been assigned to another member in the group.
因此;因为心跳线程是一个单独的线程,所以代码中的睡眠不会影响它。但在您的情况下,您可以将 max.poll.interval.ms
设置为 10 秒以获得 CommitFailedException
.