将 Mono 与发出的每个 Flux 元素组合
Combine Mono with every Flux element emitted
我有如下的 Flux 和 Mono:
Mono<MyRequest> req = request.bodyToMono(MyRequest.class);
Mono<List<String>> mono1 = req.map(r -> r.getList());;
Flux<Long> flux1 = req.map(r -> r.getVals()) // getVals() return list of Long
.flatMapMany(Flux::fromIterable);
现在,对于 flux1
中的每个数字,我想调用一个方法,其中参数是来自 flux1
的 ID 和来自 mono1
的 List<String>
。像,
flux1.flatMap(id -> process(id, mono1))
但传递和处理相同 mono1
会导致错误 Only one connection receive subscriber allowed
。我怎样才能达到以上目标?谢谢!
由于这两种信息都来自同一来源,因此您可以 运行 像这样用一个管道完成整个事情,并将两个元素包装在 Tuple
或更好的域对象中,该域对象具有更多含义:
Mono<MyRequest> req = // ...
Flux<Tuple2<Long, List<String>>> tuples = req.flatMapMany(r ->
Flux.fromIterable(r.getVals())
.map(id -> Tuples.of(id, r.getList()))
);
// once there, you can map that with your process method like
tuples.map(tup -> process(tup.getT1(), tup.getT2());
请注意,这看起来很不寻常,这基本上来自您接收的对象的结构。
我有如下的 Flux 和 Mono:
Mono<MyRequest> req = request.bodyToMono(MyRequest.class);
Mono<List<String>> mono1 = req.map(r -> r.getList());;
Flux<Long> flux1 = req.map(r -> r.getVals()) // getVals() return list of Long
.flatMapMany(Flux::fromIterable);
现在,对于 flux1
中的每个数字,我想调用一个方法,其中参数是来自 flux1
的 ID 和来自 mono1
的 List<String>
。像,
flux1.flatMap(id -> process(id, mono1))
但传递和处理相同 mono1
会导致错误 Only one connection receive subscriber allowed
。我怎样才能达到以上目标?谢谢!
由于这两种信息都来自同一来源,因此您可以 运行 像这样用一个管道完成整个事情,并将两个元素包装在 Tuple
或更好的域对象中,该域对象具有更多含义:
Mono<MyRequest> req = // ...
Flux<Tuple2<Long, List<String>>> tuples = req.flatMapMany(r ->
Flux.fromIterable(r.getVals())
.map(id -> Tuples.of(id, r.getList()))
);
// once there, you can map that with your process method like
tuples.map(tup -> process(tup.getT1(), tup.getT2());
请注意,这看起来很不寻常,这基本上来自您接收的对象的结构。