如何从 Spring Cloud Stream 中 producer.send() 发送的异常中恢复
How to recover from exceptions sent by producer.send() in Spring Cloud Stream
我们经历了以下场景:
- 我们有一个由3个节点组成的Kafka集群,创建的每个主题有3个分区
- 一条消息通过
MessageChannel.send()
发送,为分区 1 生成一条记录
- 充当该分区的分区领导者的代理失败
默认情况下,MessageChannel.send()
returns true
不会抛出任何异常,即使最终 KafkaProducer 无法成功发送消息。我们观察到,在此调用后大约 30 秒,日志中出现以下消息:Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time
在我们的例子中,这是不可接受的,因为我们必须确保在调用 MessageChannel.send()
的 return 时刻,所有消息最终都传送到 Kafka。
我们打开了 spring.cloud.stream.kafka.bindings.<channelName>.producer.sync
到 true
,这与文档描述的完全一样。它阻塞了生产者确认交付成功或失败的调用者(MessageTimeoutException
、InterruptedException
、ExecutionException
),所有这些都由KafkaProducerMessageHandler
控制。这似乎是我们最好的方法,因为在我们的案例中性能影响可以忽略不计。
但是,如果抛出异常,我们是否需要自己处理重试? (例如在我们的客户端代码中 @Retryable
)
这是一个简单的实验项目:https://github.com/phdezann/spring-cloud-bus-kafka-helloworld
如果send()
在@StreamListener
线程上执行,异常抛回binder,binder重试配置会执行重试。
但是,由于您是在 HTTP 线程上执行发送,因此您需要自己重试(在 RetryTemplate()
范围内调用发送)或创建控制器方法 @Retryable
。
我们经历了以下场景:
- 我们有一个由3个节点组成的Kafka集群,创建的每个主题有3个分区
- 一条消息通过
MessageChannel.send()
发送,为分区 1 生成一条记录
- 充当该分区的分区领导者的代理失败
默认情况下,MessageChannel.send()
returns true
不会抛出任何异常,即使最终 KafkaProducer 无法成功发送消息。我们观察到,在此调用后大约 30 秒,日志中出现以下消息:Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time
在我们的例子中,这是不可接受的,因为我们必须确保在调用 MessageChannel.send()
的 return 时刻,所有消息最终都传送到 Kafka。
我们打开了 spring.cloud.stream.kafka.bindings.<channelName>.producer.sync
到 true
,这与文档描述的完全一样。它阻塞了生产者确认交付成功或失败的调用者(MessageTimeoutException
、InterruptedException
、ExecutionException
),所有这些都由KafkaProducerMessageHandler
控制。这似乎是我们最好的方法,因为在我们的案例中性能影响可以忽略不计。
但是,如果抛出异常,我们是否需要自己处理重试? (例如在我们的客户端代码中 @Retryable
)
这是一个简单的实验项目:https://github.com/phdezann/spring-cloud-bus-kafka-helloworld
如果send()
在@StreamListener
线程上执行,异常抛回binder,binder重试配置会执行重试。
但是,由于您是在 HTTP 线程上执行发送,因此您需要自己重试(在 RetryTemplate()
范围内调用发送)或创建控制器方法 @Retryable
。