如何在 REST API 中拥有订阅者?

How to have a subscriber in a REST API?

考虑进入您的 Web 服务器的请求,调用 REST API。此请求正在为服务器提供一项耗时的工作。因此,Web 服务器代码会将工作委托给其他服务器以提供可扩展的解决方案。这种场景通常使用发布者-订阅者模式来实现。

到目前为止一切顺利,但如果我想为结果保留请求怎么办?我可以将发布者-订阅者反过来,所以现在我的 REST API 代码将等待一些发布者在结果准备好时收到通知。

此方案的问题在于订阅通知是以异步方式完成的(这本身就是完美的)。因此,在 Spring 中实现 REST API 等待来自 RabbitMQ 的通知将如下所示:

@RequestMapping(value = "/url", method = RequestMethod.GET)
public ResponseEntity url()
{
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String command = "Job Description";
    channel.basicPublish("", "Heavy Worker", null, command.getBytes());

    Consumer consumer = new DefaultConsumer(channel)
    {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
            throws IOException
        {
            String result = new String(body, "UTF-8");
            //return the result as the HTTP response
        }
    };
    channel.basicConsume("Heavy Job Result", true, consumer);
}

我试图在上面的代码片段中说明的问题是,在 channel.basicConsume("Heavy Job Result", true, consumer); 之后,响应将返回给客户端并且 Java 不会等待 "Heavy Job Result" 待准备。

是否有已知的做法如何以非轮询方式保持消息请求挂起?

解决方案将取决于您完成繁重的工作需要多长时间。如果它完成的时间少于配置的服务器超时时间,那么您所要做的就是不要像这样 returning CompletableFutureListentableFuture 来阻止您的网络 IO:

@RequestMapping(value = "/url", method = RequestMethod.GET)
public CompletableFuture<ResponseEntity> url() {
  return CompletableFuture.supplyAsync(() -> {
    // do your computations here
  });
}

Spring 会为您处理剩下的事情。

如果它花费的时间比您的服务器超时时间长,那么您可以使用 WebSockets 处理作业完成后从服务器向客户端发送的消息。 查看 Spring WebSocket Support。流程将是:

  1. 收到请求
  2. 将其交给新线程中的长 运行 处理器(例如 CompletableFuture.runAsync())
  3. return 开始这个新话题后立即回复
  4. 作业完成后,您向前端发送一条消息,告知它已完成并包含您想要包含的任何数据