在 webflux/reactor 流中 "unwrap" Mono 的安全方法?

Safe way to "unwrap" a Mono in a webflux/reactor stream?

我目前正在使用一个使用 Spring WebFlux 和 Project Reactor 的库。我工作的主要领域是反应流结束的地方,数据需要再次回到同步的、阻塞的世界。这是不可避免的,因为我将与之交互的方法不是响应式的(并且不可修改)。我是反应式编程的新手,所以我不确定如何解决这个问题。

我使用的界面与此类似:

interface Resolver {
    Mono<Object> resolve(HttpRequest request);
}

我有一个 class,它使用上述接口的多个实现来获取一些数据。我最终得到了这样的 HashMap

Mono<Map<String, Mono<Object>>> resolvedData;

此时我需要的是从根本上“展开”HashMap 中的 Mono<Object> 值,并使其实际值如下所示:

Mono<Map<String, Object>> actualResolvedData;

我似乎想不出一个干净利落的方法。每当我尝试直接调用 block() 时,都会出现以下异常:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread

我有点理解异常,但同时我不知道如何避免它。我知道这不是“理想的”响应式编程,因为 objective 不会阻塞。这是我无法控制的,因为这是需要这些数据的地方。我错过了一些简单的东西吗?我已经盯着这段代码看了几天了,没有进一步弄清楚这一点。任何建议将不胜感激。谢谢。

非常感谢@MartinTarjányi 帮助解决了我的问题。有人向我展示了一种将 Resovler 接口的 Mono<Object> 输出映射到 Tuple 之前 它被制作成 Map<String, Object> 的方法.这样我就可以使用常规 flatMap 进行第一次转换(我在问题中称之为“展开”),然后将 Tuple 转换为 Map [=18] =] 像以前一样。完整的答案在上面的评论中,但这是最终修复代码的方式:

private Mono<Map<String, Object>> getValues(List<ResolvedData> resolvedData, HttpRequest httpRequest) {
    return Flux.fromIterable(resolvedData).flatMap(data -> {
        Resolver resolver = data.getResolver();
        return resolver.resolve(httpRequest).map(resolvedValue -> Tuples.of(data.getName(), resolvedValue));
    }).collectMap(Tuple2::getT1, Tuple2::getT2);
}