处理 Mono Inside Flux Flatmap
Handling Mono Inside Flux Flatmap
我有一个 Flux of strings。对于每个字符串,我都必须进行远程调用。但问题在于,进行远程调用的方法实际上 return 是响应的 Mono(显然,因为对应于单个请求,所以会有单个响应)。
处理此类情况的正确模式应该是什么?我能想到的一种解决方案是对流元素进行串行(或并行)调用,并将响应减少到单个元素和 return.
代码如下:
fluxObj.flatmap(a -> makeRemoteCall(a)//converts the Mono of the response to a Flux).reduce(...)
我无法在 flatmap
中思考。makeRemoteCall
方法 return 是 Mono
。但是 flatmap
return 是一个 Flux
的响应。首先,为什么会这样?其次,这是否意味着 returned Flux
包含单个响应对象(在 Mono
中 returned)?
如果您查看文档 flatMap,您可以找到问题的答案:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.
长话短说,
@Test
public void testFlux() {
Flux<String> oneString = Flux.just("1");
oneString
.flatMap(s -> testMono(s))
.collectList()
.subscribe(integers -> System.out.println("elements:" + integers));
}
private Mono<Integer> testMono(String s) {
return Mono.just(Integer.valueOf(s + "0"));
}
mapper - s -> testMono(s)
其中 testMono(s)
是发布者(在你的例子中 makeRemoteCall(a)
) ,它将我的 oneString
类型转换为 Integer.
我收集了Flux结果到List,并打印出来。控制台输出:
elements:[10]
表示flatMap
运算符后的结果Flux只包含一个元素
如果 mapper Function
return 是一个 Mono
,则意味着 中的每个源元素将(最多)有一个派生值=16=].
拥有 Function
return:
- 给定值的空 Mono(例如
Mono.empty()
)表示此源值是 "ignored"
- 一个值
Mono
(就像你的例子)意味着这个源值被异步映射到另一个特定值
- a
Flux
对于给定值具有多个派生值意味着此源值异步映射到多个值
例如,给定以下 flatMap
:
Flux.just("A", "B")
.flatMap(v -> Mono.just("value" + v))
订阅以上 Flux<String>
并打印发出的元素将产生:
valueA
valueB
另一个有趣的例子:延迟可能会导致乱序结果。像这样:
Flux.just(300, 100)
.flatMap(delay -> Mono.delay(Duration.ofMillis(delay))
.thenReturn(delay + "ms")
)
将导致 Flux<String>
产生:
100ms
300ms
我有一个 Flux of strings。对于每个字符串,我都必须进行远程调用。但问题在于,进行远程调用的方法实际上 return 是响应的 Mono(显然,因为对应于单个请求,所以会有单个响应)。
处理此类情况的正确模式应该是什么?我能想到的一种解决方案是对流元素进行串行(或并行)调用,并将响应减少到单个元素和 return.
代码如下:
fluxObj.flatmap(a -> makeRemoteCall(a)//converts the Mono of the response to a Flux).reduce(...)
我无法在 flatmap
中思考。makeRemoteCall
方法 return 是 Mono
。但是 flatmap
return 是一个 Flux
的响应。首先,为什么会这样?其次,这是否意味着 returned Flux
包含单个响应对象(在 Mono
中 returned)?
如果您查看文档 flatMap,您可以找到问题的答案:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.
长话短说,
@Test
public void testFlux() {
Flux<String> oneString = Flux.just("1");
oneString
.flatMap(s -> testMono(s))
.collectList()
.subscribe(integers -> System.out.println("elements:" + integers));
}
private Mono<Integer> testMono(String s) {
return Mono.just(Integer.valueOf(s + "0"));
}
mapper - s -> testMono(s)
其中 testMono(s)
是发布者(在你的例子中 makeRemoteCall(a)
) ,它将我的 oneString
类型转换为 Integer.
我收集了Flux结果到List,并打印出来。控制台输出:
elements:[10]
表示flatMap
运算符后的结果Flux只包含一个元素
如果 mapper Function
return 是一个 Mono
,则意味着 中的每个源元素将(最多)有一个派生值=16=].
拥有 Function
return:
- 给定值的空 Mono(例如
Mono.empty()
)表示此源值是 "ignored" - 一个值
Mono
(就像你的例子)意味着这个源值被异步映射到另一个特定值 - a
Flux
对于给定值具有多个派生值意味着此源值异步映射到多个值
例如,给定以下 flatMap
:
Flux.just("A", "B")
.flatMap(v -> Mono.just("value" + v))
订阅以上 Flux<String>
并打印发出的元素将产生:
valueA
valueB
另一个有趣的例子:延迟可能会导致乱序结果。像这样:
Flux.just(300, 100)
.flatMap(delay -> Mono.delay(Duration.ofMillis(delay))
.thenReturn(delay + "ms")
)
将导致 Flux<String>
产生:
100ms
300ms