如何在 REST API 中拥有订阅者?
How to have a subscriber in a REST API?
考虑进入您的 Web 服务器的请求,调用 REST API。此请求正在为服务器提供一项耗时的工作。因此,Web 服务器代码会将工作委托给其他服务器以提供可扩展的解决方案。这种场景通常使用发布者-订阅者模式来实现。
到目前为止一切顺利,但如果我想为结果保留请求怎么办?我可以将发布者-订阅者反过来,所以现在我的 REST API 代码将等待一些发布者在结果准备好时收到通知。
此方案的问题在于订阅通知是以异步方式完成的(这本身就是完美的)。因此,在 Spring 中实现 REST API 等待来自 RabbitMQ 的通知将如下所示:
@RequestMapping(value = "/url", method = RequestMethod.GET)
public ResponseEntity url()
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String command = "Job Description";
channel.basicPublish("", "Heavy Worker", null, command.getBytes());
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException
{
String result = new String(body, "UTF-8");
//return the result as the HTTP response
}
};
channel.basicConsume("Heavy Job Result", true, consumer);
}
我试图在上面的代码片段中说明的问题是,在 channel.basicConsume("Heavy Job Result", true, consumer);
之后,响应将返回给客户端并且 Java 不会等待 "Heavy Job Result"
待准备。
是否有已知的做法如何以非轮询方式保持消息请求挂起?
解决方案将取决于您完成繁重的工作需要多长时间。如果它完成的时间少于配置的服务器超时时间,那么您所要做的就是不要像这样 returning CompletableFuture
或 ListentableFuture
来阻止您的网络 IO:
@RequestMapping(value = "/url", method = RequestMethod.GET)
public CompletableFuture<ResponseEntity> url() {
return CompletableFuture.supplyAsync(() -> {
// do your computations here
});
}
Spring 会为您处理剩下的事情。
如果它花费的时间比您的服务器超时时间长,那么您可以使用 WebSockets 处理作业完成后从服务器向客户端发送的消息。
查看 Spring WebSocket Support。流程将是:
- 收到请求
- 将其交给新线程中的长 运行 处理器(例如 CompletableFuture.runAsync())
- return 开始这个新话题后立即回复
- 作业完成后,您向前端发送一条消息,告知它已完成并包含您想要包含的任何数据
考虑进入您的 Web 服务器的请求,调用 REST API。此请求正在为服务器提供一项耗时的工作。因此,Web 服务器代码会将工作委托给其他服务器以提供可扩展的解决方案。这种场景通常使用发布者-订阅者模式来实现。
到目前为止一切顺利,但如果我想为结果保留请求怎么办?我可以将发布者-订阅者反过来,所以现在我的 REST API 代码将等待一些发布者在结果准备好时收到通知。
此方案的问题在于订阅通知是以异步方式完成的(这本身就是完美的)。因此,在 Spring 中实现 REST API 等待来自 RabbitMQ 的通知将如下所示:
@RequestMapping(value = "/url", method = RequestMethod.GET)
public ResponseEntity url()
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String command = "Job Description";
channel.basicPublish("", "Heavy Worker", null, command.getBytes());
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException
{
String result = new String(body, "UTF-8");
//return the result as the HTTP response
}
};
channel.basicConsume("Heavy Job Result", true, consumer);
}
我试图在上面的代码片段中说明的问题是,在 channel.basicConsume("Heavy Job Result", true, consumer);
之后,响应将返回给客户端并且 Java 不会等待 "Heavy Job Result"
待准备。
是否有已知的做法如何以非轮询方式保持消息请求挂起?
解决方案将取决于您完成繁重的工作需要多长时间。如果它完成的时间少于配置的服务器超时时间,那么您所要做的就是不要像这样 returning CompletableFuture
或 ListentableFuture
来阻止您的网络 IO:
@RequestMapping(value = "/url", method = RequestMethod.GET)
public CompletableFuture<ResponseEntity> url() {
return CompletableFuture.supplyAsync(() -> {
// do your computations here
});
}
Spring 会为您处理剩下的事情。
如果它花费的时间比您的服务器超时时间长,那么您可以使用 WebSockets 处理作业完成后从服务器向客户端发送的消息。 查看 Spring WebSocket Support。流程将是:
- 收到请求
- 将其交给新线程中的长 运行 处理器(例如 CompletableFuture.runAsync())
- return 开始这个新话题后立即回复
- 作业完成后,您向前端发送一条消息,告知它已完成并包含您想要包含的任何数据