如何用单声道映射通量?
How to map a flux with mono?
我有这两个要求:
Flux<ProductID> getProductIds() {
return this.webClient.get()
.uri(PRODUCT_ID_URI)
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(ProductID.class);
}
Mono<Product> getProduct(String id) {
return this.itemServiceWebClient.get()
.uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
.build(id))
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}
有了这些我想做以下事情:
Flux<Product> getProducts() {
return Flux.create(sink -> this.gateway.getProductIds()
.doOnComplete(() -> {
log.info("PRODUCTS COMPLETE");
sink.complete();
})
.flatMap(productId -> this.getProduct(productId.getID()))
.subscribe(product -> {
log.info("NEW PRODUCT: " + product);
sink.next(product);
}));
}
当我调用它时,我得到以下输出:
PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....
当然,在结果实际存在之前,流正在关闭,因为异步单声道映射。我怎样才能保持这种非阻塞,同时确保结果在调用 on complete 之前到达?
假设 getProducts
是一个控制器方法,并且您想将这些产品添加到您的模型中以在视图模板中呈现,您可以这样解决这个问题:
@GetMapping("/products")
public String getProducts(Model model) {
Flux<Product> products = this.gateway.getProductIds()
.flatMap(productId -> this.getProduct(productId.getID()));
model.put("products", products);
// return the view name
return "showProducts";
}
我有这两个要求:
Flux<ProductID> getProductIds() {
return this.webClient.get()
.uri(PRODUCT_ID_URI)
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(ProductID.class);
}
Mono<Product> getProduct(String id) {
return this.itemServiceWebClient.get()
.uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
.build(id))
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}
有了这些我想做以下事情:
Flux<Product> getProducts() {
return Flux.create(sink -> this.gateway.getProductIds()
.doOnComplete(() -> {
log.info("PRODUCTS COMPLETE");
sink.complete();
})
.flatMap(productId -> this.getProduct(productId.getID()))
.subscribe(product -> {
log.info("NEW PRODUCT: " + product);
sink.next(product);
}));
}
当我调用它时,我得到以下输出:
PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....
当然,在结果实际存在之前,流正在关闭,因为异步单声道映射。我怎样才能保持这种非阻塞,同时确保结果在调用 on complete 之前到达?
假设 getProducts
是一个控制器方法,并且您想将这些产品添加到您的模型中以在视图模板中呈现,您可以这样解决这个问题:
@GetMapping("/products")
public String getProducts(Model model) {
Flux<Product> products = this.gateway.getProductIds()
.flatMap(productId -> this.getProduct(productId.getID()));
model.put("products", products);
// return the view name
return "showProducts";
}