部署过程中kafka生产者端的消息会丢失吗?
Can messages be lost at kafka producer side during deployment?
我们面临一个特殊的问题,当我们向 Kafka 生成消息时,有时在消费者端找不到它。我们尝试进一步调试并启用 onSuccess() 和 onFailure() 回调。我们得到的主要问题是 -
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
为了解决这个问题,我们将重试次数增加到 10 次,这几乎完全解决了这个问题。
但是,我们发现了 3 条消息(每条消息都在不同的时间)我们既没有 onSuccess() 也没有 onFailure() 回调。就是在交流中迷路了,这么说吧!
现在,这发生在应用程序被取消重新部署之前。我从 Kafka Producer Config 了解到,默认批处理大小为 16KB,它会在实际将消息发送给代理之前等待批处理被填充(为了简单起见,我特意去掉了 linger.ms 考虑)。
我的问题是,当系统强制关闭部署时,Kafka批处理中的所有消息是否会丢失?如果是,我们如何解决这个问题?
请帮助我,因为这是我们在生产中面临的问题。
非常感谢!
如果您正在使用批处理并且服务器死机(kill -9、System.exit()
或电源故障),您可能会丢失消息。
如果您正在使用 Spring 引导并执行有序关闭 (Ctrl-C) 或以其他方式关闭 Spring ApplicationContext(例如使用 ShutDownHook),您不应该丢失任何东西,因为生产者将在上下文关闭期间关闭,强制推送部分批处理。
如果挂起的发送无法完成,您应该会看到一条日志消息:
log.info("Proceeding to force close the producer since pending requests could not be completed " +
"within timeout {} ms.", timeoutMs);
您可以在KafkaProducer
中看到close()代码。
我们面临一个特殊的问题,当我们向 Kafka 生成消息时,有时在消费者端找不到它。我们尝试进一步调试并启用 onSuccess() 和 onFailure() 回调。我们得到的主要问题是 -
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
为了解决这个问题,我们将重试次数增加到 10 次,这几乎完全解决了这个问题。
但是,我们发现了 3 条消息(每条消息都在不同的时间)我们既没有 onSuccess() 也没有 onFailure() 回调。就是在交流中迷路了,这么说吧!
现在,这发生在应用程序被取消重新部署之前。我从 Kafka Producer Config 了解到,默认批处理大小为 16KB,它会在实际将消息发送给代理之前等待批处理被填充(为了简单起见,我特意去掉了 linger.ms 考虑)。
我的问题是,当系统强制关闭部署时,Kafka批处理中的所有消息是否会丢失?如果是,我们如何解决这个问题?
请帮助我,因为这是我们在生产中面临的问题。
非常感谢!
如果您正在使用批处理并且服务器死机(kill -9、System.exit()
或电源故障),您可能会丢失消息。
如果您正在使用 Spring 引导并执行有序关闭 (Ctrl-C) 或以其他方式关闭 Spring ApplicationContext(例如使用 ShutDownHook),您不应该丢失任何东西,因为生产者将在上下文关闭期间关闭,强制推送部分批处理。
如果挂起的发送无法完成,您应该会看到一条日志消息:
log.info("Proceeding to force close the producer since pending requests could not be completed " +
"within timeout {} ms.", timeoutMs);
您可以在KafkaProducer
中看到close()代码。