WebFlux:只有一个项目到达后端
WebFlux: Only one item arriving at the backend
我在后台做的是:
@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public void saveProducts(@Valid @RequestBody Flux<Product> products) {
products.subscribe(product -> log.info("product: " + product.toString()));
}
在前端,我使用以下方式调用它:
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.exchange()
.subscribe();
现在发生的情况是我有 472 个产品需要保存,但实际上只有一个正在保存。流在第一个之后关闭,我无法找出原因。
如果我这样做:
...
.retrieve()
.bodyToMono(Void.class);
相反,请求甚至没有到达后端。
我也试过修复元素数量:
.body(Flux.just(new Product("123"), new Product("321")...
而且只有第一个到达。
编辑
我修改了代码:
@PostMapping(path = "/products", consumes =
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
products.subscribe(product -> this.service.saveProduct(product));
return Mono.empty();
}
和:
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.exchange()
.block();
这导致一种产品被保存两次(因为后端端点被调用了两次)但同样只有一种产品。我们在前端也遇到了错误:
IOException: Connection reset by peer
同样适用于:
...
.retrieve()
.bodyToMono(Void.class)
.subscribe();
正在执行以下操作:
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.retrieve();
导致后端不再被调用的行为。
Reactor 文档 does say that nothing happens until you subscribe,但这并不意味着您应该订阅 Spring WebFlux 代码。
以下是您在 Spring WebFlux 中应遵循的一些规则:
- 如果您需要以反应方式做某事,您的方法的 return 类型应该是
Mono
或 Flux
- 在 return 反应式错字的方法中,您永远不应调用
block
或 subscribe
、toIterable
或任何其他不 return 反应类型本身
- 您永远不应该在副作用
DoOnXYZ
运算符中执行 I/O-related,因为它们不是为此而设计的,这会在运行时引起问题
在您的情况下,您的后端应该使用反应式存储库来保存您的数据,并且应该如下所示:
@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
return productRepository.saveAll(products).then();
}
在这种情况下,Mono<Void>
return 类型意味着您的控制器不会 return 任何响应主体,但在完成请求处理后仍会发出信号。这可能解释了为什么您会看到这种行为 - 当控制器处理完请求时,所有产品都没有保存在数据库中。
此外,请记住上述规则。根据使用 targetWebClient
的位置,对其调用 .subscribe();
可能不是解决方案。如果它是 returns void
的测试方法,您可能想对其调用 block
并获取结果以测试其上的断言。如果这是一个组件方法,那么您可能应该 return 一个 Publisher
类型作为 return 值。
编辑:
@PostMapping(path = "/products", consumes =
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
products.subscribe(product -> this.service.saveProduct(product));
return Mono.empty();
}
这样做是不对的:
- 调用订阅将 request/response 的处理与
saveProduct
操作分离。这就像在不同的执行程序中开始该处理。
- returning
Mono.empty()
向 WebFlux 发出信号 Spring 您已立即完成请求处理。所以 Spring WebFlux 将关闭并清理 request/response 资源;但是您的 saveProduct
进程仍然是 运行 并且无法从请求中读取,因为 Spring WebFlux 关闭并清理了它。
如评论中所建议,您可以 wrap blocking operations with Reactor(尽管不建议这样做并且您可能会遇到性能问题)并确保将所有操作连接到单个反应管道中。
我在后台做的是:
@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public void saveProducts(@Valid @RequestBody Flux<Product> products) {
products.subscribe(product -> log.info("product: " + product.toString()));
}
在前端,我使用以下方式调用它:
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.exchange()
.subscribe();
现在发生的情况是我有 472 个产品需要保存,但实际上只有一个正在保存。流在第一个之后关闭,我无法找出原因。
如果我这样做:
...
.retrieve()
.bodyToMono(Void.class);
相反,请求甚至没有到达后端。
我也试过修复元素数量:
.body(Flux.just(new Product("123"), new Product("321")...
而且只有第一个到达。
编辑
我修改了代码:
@PostMapping(path = "/products", consumes =
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
products.subscribe(product -> this.service.saveProduct(product));
return Mono.empty();
}
和:
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.exchange()
.block();
这导致一种产品被保存两次(因为后端端点被调用了两次)但同样只有一种产品。我们在前端也遇到了错误:
IOException: Connection reset by peer
同样适用于:
...
.retrieve()
.bodyToMono(Void.class)
.subscribe();
正在执行以下操作:
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.retrieve();
导致后端不再被调用的行为。
Reactor 文档 does say that nothing happens until you subscribe,但这并不意味着您应该订阅 Spring WebFlux 代码。
以下是您在 Spring WebFlux 中应遵循的一些规则:
- 如果您需要以反应方式做某事,您的方法的 return 类型应该是
Mono
或Flux
- 在 return 反应式错字的方法中,您永远不应调用
block
或subscribe
、toIterable
或任何其他不 return 反应类型本身 - 您永远不应该在副作用
DoOnXYZ
运算符中执行 I/O-related,因为它们不是为此而设计的,这会在运行时引起问题
在您的情况下,您的后端应该使用反应式存储库来保存您的数据,并且应该如下所示:
@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
return productRepository.saveAll(products).then();
}
在这种情况下,Mono<Void>
return 类型意味着您的控制器不会 return 任何响应主体,但在完成请求处理后仍会发出信号。这可能解释了为什么您会看到这种行为 - 当控制器处理完请求时,所有产品都没有保存在数据库中。
此外,请记住上述规则。根据使用 targetWebClient
的位置,对其调用 .subscribe();
可能不是解决方案。如果它是 returns void
的测试方法,您可能想对其调用 block
并获取结果以测试其上的断言。如果这是一个组件方法,那么您可能应该 return 一个 Publisher
类型作为 return 值。
编辑:
@PostMapping(path = "/products", consumes =
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
products.subscribe(product -> this.service.saveProduct(product));
return Mono.empty();
}
这样做是不对的:
- 调用订阅将 request/response 的处理与
saveProduct
操作分离。这就像在不同的执行程序中开始该处理。 - returning
Mono.empty()
向 WebFlux 发出信号 Spring 您已立即完成请求处理。所以 Spring WebFlux 将关闭并清理 request/response 资源;但是您的saveProduct
进程仍然是 运行 并且无法从请求中读取,因为 Spring WebFlux 关闭并清理了它。
如评论中所建议,您可以 wrap blocking operations with Reactor(尽管不建议这样做并且您可能会遇到性能问题)并确保将所有操作连接到单个反应管道中。