WebFlux:只有一个项目到达后端

WebFlux: Only one item arriving at the backend

我在后台做的是:

@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public void saveProducts(@Valid @RequestBody Flux<Product> products) {
    products.subscribe(product -> log.info("product: " + product.toString()));
}

在前端,我使用以下方式调用它:

this.targetWebClient
            .post()
            .uri(productUri)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(this.sourceWebClient
                    .get()
                    .uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
                            .queryParam("date", date)
                            .build())
                    .accept(MediaType.APPLICATION_STREAM_JSON)
                    .retrieve()
                    .bodyToFlux(Product.class), Product.class)
            .exchange()
            .subscribe();

现在发生的情况是我有 472 个产品需要保存,但实际上只有一个正在保存。流在第一个之后关闭,我无法找出原因。

如果我这样做:

...
.retrieve()
.bodyToMono(Void.class);

相反,请求甚至没有到达后端。

我也试过修复元素数量:

.body(Flux.just(new Product("123"), new Product("321")...

而且只有第一个到达。

编辑

我修改了代码:

@PostMapping(path = "/products", consumes = 
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
    products.subscribe(product -> this.service.saveProduct(product));

    return Mono.empty();
}

和:

this.targetWebClient
            .post()
            .uri(productUri)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(this.sourceWebClient
                    .get()
                    .uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
                            .queryParam("date", date)
                            .build())
                    .accept(MediaType.APPLICATION_STREAM_JSON)
                    .retrieve()
                    .bodyToFlux(Product.class), Product.class)
            .exchange()
            .block();

这导致一种产品被保存两次(因为后端端点被调用了两次)但同样只有一种产品。我们在前端也遇到了错误:

IOException: Connection reset by peer

同样适用于:

...
.retrieve()
.bodyToMono(Void.class)
.subscribe();

正在执行以下操作:

this.targetWebClient
            .post()
            .uri(productUri)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(this.sourceWebClient
                    .get()
                    .uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
                            .queryParam("date", date)
                            .build())
                    .accept(MediaType.APPLICATION_STREAM_JSON)
                    .retrieve()
                    .bodyToFlux(Product.class), Product.class)
            .retrieve();

导致后端不再被调用的行为。

Reactor 文档 does say that nothing happens until you subscribe,但这并不意味着您应该订阅 Spring WebFlux 代码。

以下是您在 Spring WebFlux 中应遵循的一些规则:

  • 如果您需要以反应方式做某事,您的方法的 return 类型应该是 MonoFlux
  • 在 return 反应式错字的方法中,您永远不应调用 blocksubscribetoIterable 或任何其他不 return 反应类型本身
  • 您永远不应该在副作用 DoOnXYZ 运算符中执行 I/O-related,因为它们不是为此而设计的,这会在运行时引起问题

在您的情况下,您的后端应该使用反应式存储库来保存您的数据,并且应该如下所示:

@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
    return productRepository.saveAll(products).then();
}

在这种情况下,Mono<Void> return 类型意味着您的控制器不会 return 任何响应主体,但在完成请求处理后仍会发出信号。这可能解释了为什么您会看到这种行为 - 当控制器处理完请求时,所有产品都没有保存在数据库中。

此外,请记住上述规则。根据使用 targetWebClient 的位置,对其调用 .subscribe(); 可能不是解决方案。如果它是 returns void 的测试方法,您可能想对其调用 block 并获取结果以测试其上的断言。如果这是一个组件方法,那么您可能应该 return 一个 Publisher 类型作为 return 值。

编辑:

@PostMapping(path = "/products", consumes = 
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
    products.subscribe(product -> this.service.saveProduct(product));

    return Mono.empty();
}

这样做是不对的:

  • 调用订阅将 request/response 的处理与 saveProduct 操作分离。这就像在不同的执行程序中开始该处理。
  • returning Mono.empty() 向 WebFlux 发出信号 Spring 您已立即完成请求处理。所以 Spring WebFlux 将关闭并清理 request/response 资源;但是您的 saveProduct 进程仍然是 运行 并且无法从请求中读取,因为 Spring WebFlux 关闭并清理了它。

如评论中所建议,您可以 wrap blocking operations with Reactor(尽管不建议这样做并且您可能会遇到性能问题)并确保将所有操作连接到单个反应管道中。