处理 Mono Inside Flux Flatmap

Handling Mono Inside Flux Flatmap

我有一个 Flux of strings。对于每个字符串,我都必须进行远程调用。但问题在于,进行远程调用的方法实际上 return 是响应的 Mono(显然,因为对应于单个请求,所以会有单个响应)。

处理此类情况的正确模式应该是什么?我能想到的一种解决方案是对流元素进行串行(或并行)调用,并将响应减少到单个元素和 return.

代码如下:

fluxObj.flatmap(a -> makeRemoteCall(a)//converts the Mono of the response to a Flux).reduce(...)

我无法在 flatmap 中思考。makeRemoteCall 方法 return 是 Mono。但是 flatmap return 是一个 Flux 的响应。首先,为什么会这样?其次,这是否意味着 returned Flux 包含单个响应对象(在 Mono 中 returned)?

如果您查看文档 flatMap,您可以找到问题的答案:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.

长话短说,

@Test
public void testFlux() {
    Flux<String> oneString = Flux.just("1");

    oneString
        .flatMap(s -> testMono(s))
        .collectList()
        .subscribe(integers -> System.out.println("elements:" + integers));       

}

private Mono<Integer> testMono(String s) {
    return Mono.just(Integer.valueOf(s + "0"));
}

mapper - s -> testMono(s) 其中 testMono(s) 是发布者(在你的例子中 makeRemoteCall(a)) ,它将我的 oneString 类型转换为 Integer.

我收集了Flux结果到List,并打印出来。控制台输出:

elements:[10]

表示flatMap运算符后的结果Flux只包含一个元素

如果 mapper Function return 是一个 Mono,则意味着 中的每个源元素将(最多)有一个派生值=16=].

拥有 Function return:

  • 给定值的空 Mono(例如 Mono.empty())表示此源值是 "ignored"
  • 一个值Mono(就像你的例子)意味着这个源值被异步映射到另一个特定值
  • a Flux 对于给定值具有多个派生值意味着此源值异步映射到多个值

例如,给定以下 flatMap

Flux.just("A", "B")
    .flatMap(v -> Mono.just("value" + v))

订阅以上 Flux<String> 并打印发出的元素将产生:

valueA
valueB

另一个有趣的例子:延迟可能会导致乱序结果。像这样:

Flux.just(300, 100)
    .flatMap(delay -> Mono.delay(Duration.ofMillis(delay))
                          .thenReturn(delay + "ms")
    )

将导致 Flux<String> 产生:

100ms
300ms