Spring Http 请求上的 Kafka 侦听器

Spring Kafka Listener on Http Request

我正在使用 Spring 和 Kafka,我发出一个 HTTP POST 请求,如下所示,并通过 Kafka 主题向另一个服务发送一些信息。

@RequestMapping(method = RequestMethod.POST, value = "/portfolio")
public void getPortfolio(
       Authentication auth,
    @RequestBody User user
) {
    //Data Transfer Object
    UserDTO dto = user.toDTO();
    dto.setId(((AuthenticatedUser) auth.getPrincipal()).getId());

    //Sending message to Kafka topic
    sender.sendPortfolioRequest(dto);
}

然后我想听取关于不同主题的响应和 HTTP 响应中的 return 数据,但我被困在这里。我可以使用下面的侦听器方法来侦听响应,但不知道如何将两者放在一起。

@KafkaListener(
    topics = Topics.PORTFOLIO_RESULT,
    containerFactory = "portfolioKafkaListenerContainerFactory"
)
public void portfolioListener(UserPortfolioDTO portfolio) {
    System.out.println("Recieved Portfolio: " + portfolio.toString());
}

P.S。我是使用 HTTP 请求的新手,不知道这是否是完成我想要实现的目标的正确方法,或者我是否应该使用 POST 创建一个新资源并重定向到那个或其他东西.

@KafkaListener 无法做到这一点,因为它是单独启动的,并且完全在自己的线程中工作。同时,您希望在 HTTP 请求线程中得到回复。

这里唯一可能的解决方案是 ConsumerFactory 和手动使用 Apache Kafka Consumer。所以,你发送,你从工厂获得一个 Consumer 实例,调用它的 poll() 被阻塞直到结果,为 HTTP 构建响应并关闭 Consumer.

根据我的理解,我建议您启用异步 http 请求,以便能够 link 您的进程。

Creating Asynchronous Methods with springboot

此选项将允许您处理 sendPortfolioRequest 并释放 http 请求(否则您将收到来自客户端的 http 请求超时)。

你尝试做的事情看起来像一个反模式:你想做 link 一个同步 http 请求(因为你的 http 客户端正在等待来自服务器的响应)和一个异步消息系统(卡夫卡)。

为了能够做你想做的事,我建议你更改你的 http 端点并添加一个 websocket 以成为最佳实践。

请看Using WebSocket to build an interactive web application