如何从 Spring Cloud Stream 中 producer.send() 发送的异常中恢复

How to recover from exceptions sent by producer.send() in Spring Cloud Stream

我们经历了以下场景:

默认情况下,MessageChannel.send() returns true 不会抛出任何异常,即使最终 KafkaProducer 无法成功发送消息。我们观察到,在此调用后大约 30 秒,日志中出现以下消息:Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time

在我们的例子中,这是不可接受的,因为我们必须确保在调用 MessageChannel.send() 的 return 时刻,所有消息最终都传送到 Kafka。

我们打开了 spring.cloud.stream.kafka.bindings.<channelName>.producer.synctrue,这与文档描述的完全一样。它阻塞了生产者确认交付成功或失败的调用者(MessageTimeoutExceptionInterruptedExceptionExecutionException),所有这些都由KafkaProducerMessageHandler控制。这似乎是我们最好的方法,因为在我们的案例中性能影响可以忽略不计。

但是,如果抛出异常,我们是否需要自己处理重试? (例如在我们的客户端代码中 @Retryable

这是一个简单的实验项目:https://github.com/phdezann/spring-cloud-bus-kafka-helloworld

如果send()@StreamListener线程上执行,异常抛回binder,binder重试配置会执行重试。

但是,由于您是在 HTTP 线程上执行发送,因此您需要自己重试(在 RetryTemplate() 范围内调用发送)或创建控制器方法 @Retryable