SpringBoot ListenableFuture中如何处理异常

How to handle exceptions in SpringBoot ListenableFuture

所以我有一个像这样启动的 SpringBoot 端点控制器:

@RequestMapping(value = "/post", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public Response post(@Valid @RequestBody Message message) throws FailedToPostException {
    message.setRecieveTime(System.currentTimeMillis());

    return this.service.post(message);
}

和 post 函数:

public Response post(Message message) throws FailedToPostException{


    ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("topicName", message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {

        @Override
        public void onSuccess(SendResult<String, Message> result) {
            LOGGER.info("Post Finished. '{}' with offset: {}", message,
                        result.getRecordMetadata().offset());
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.error("Message Post Failed. '{}'", message, ex);

            long nowMillis = System.currentTimeMillis();
            int diffSeconds =  (int) ((nowMillis - message.getRecieveTime()) / 1000);

            if (diffSeconds >= 10) {
                LOGGER.debug("timeout sending message to Kafka, aborting.");
                return;
            }
            else {
                post(message);
            }
        }
    });

    LOGGER.debug("D: " + Utils.getMetricValue("buffer-available-bytes", kafkaTemplate));

    return new Response("Message Posted");
}

现在你可以看到,我们正试图确保,如果 kafkaTemplate.send 失败,我们将再次递归调用 post(message) 最多 10 秒,直到生产者内存缓冲区清除并且消息通过。

问题是:

  1. 我们希望能够 return 对端点客户端的失败响应(例如:"Failed to acknowledge the message")。
  2. 有没有更好的方法来处理上面这段代码中来自 Future 的异常?
  3. 这里有办法避免使用递归函数吗?我们这样做是因为我们想尝试将消息传递给 Kafka 大约 10 秒钟,然后再将其作为电子邮件发送以供查看。

旁注:我仍然没有使用 kafkaTemplate.metrics() 中的 buffer-available-bytes 属性,我打算使用它来尽量减少出现此问题的可能性,但仍然需要处理以上内容以防万一一些竞争条件

有几种方法可以做到这一点,但我真的很喜欢 Spring Retry 作为解决此类问题的方法。这是一些伪代码,但如果您需要更多关于如何做的细节,我可以让事情更明确:

@Retryable(maxAttempts = 10, value = KafkaSendException.class)
public Response post(Message message) throws FailedToPostException{
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("topicName", message);

    try {
        future.get(1. TimeUnit.SECONDS);
    } catch(SomeException ex) {
        LOGGER.error("Message Post Failed. '{}'", ex.getCause().getMessage(), ex);
        throw ex;
    }

    LOGGER.info("Post Finished. '{}' with offset: {}", message,
                        result.getRecordMetadata().offset());

}

有效地做同样的事情而不用递归。我不建议使用递归代码来处理错误。

控制器应该能够很好地按摩实际的 KafkaSendException @ExceptionHandler