如何用单声道映射通量?

How to map a flux with mono?

我有这两个要求:

Flux<ProductID> getProductIds() {
    return this.webClient.get()
            .uri(PRODUCT_ID_URI)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .retrieve()
            .bodyToFlux(ProductID.class);
}

Mono<Product> getProduct(String id) {
    return this.itemServiceWebClient.get()
            .uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
                    .build(id))
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .exchange()
            .flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}

有了这些我想做以下事情:

Flux<Product> getProducts() {
    return Flux.create(sink -> this.gateway.getProductIds()
            .doOnComplete(() -> {
                log.info("PRODUCTS COMPLETE");

                sink.complete();
            })
            .flatMap(productId -> this.getProduct(productId.getID()))
            .subscribe(product -> {
                log.info("NEW PRODUCT: " + product);

                sink.next(product);
            }));
}

当我调用它时,我得到以下输出:

PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....

当然,在结果实际存在之前,流正在关闭,因为异步单声道映射。我怎样才能保持这种非阻塞,同时确保结果在调用 on complete 之前到达?

假设 getProducts 是一个控制器方法,并且您想将这些产品添加到您的模型中以在视图模板中呈现,您可以这样解决这个问题:

@GetMapping("/products")
public String getProducts(Model model) {

    Flux<Product> products = this.gateway.getProductIds()
            .flatMap(productId -> this.getProduct(productId.getID()));
    model.put("products", products);
    // return the view name
    return "showProducts";
}