将 Mono 与发出的每个 Flux 元素组合

Combine Mono with every Flux element emitted

我有如下的 Flux 和 Mono:

Mono<MyRequest> req = request.bodyToMono(MyRequest.class);
Mono<List<String>> mono1 = req.map(r -> r.getList());;
Flux<Long> flux1 = req.map(r -> r.getVals()) // getVals() return list of Long
        .flatMapMany(Flux::fromIterable);

现在,对于 flux1 中的每个数字,我想调用一个方法,其中参数是来自 flux1 的 ID 和来自 mono1List<String>。像,

flux1.flatMap(id -> process(id, mono1)) 

但传递和处理相同 mono1 会导致错误 Only one connection receive subscriber allowed。我怎样才能达到以上目标?谢谢!

由于这两种信息都来自同一来源,因此您可以 运行 像这样用一个管道完成整个事情,并将两个元素包装在 Tuple 或更好的域对象中,该域对象具有更多含义:

Mono<MyRequest> req = // ...
Flux<Tuple2<Long, List<String>>> tuples = req.flatMapMany(r ->
        Flux.fromIterable(r.getVals())
                .map(id -> Tuples.of(id, r.getList()))
);
// once there, you can map that with your process method like
tuples.map(tup -> process(tup.getT1(), tup.getT2());

请注意,这看起来很不寻常,这基本上来自您接收的对象的结构。