订阅/迭代 Mono 列表

Subscribe / Iterate a List of Mono

我有一个规则列表 return Mono<Result>,我需要执行它们和 return 另一个带有每个函数结果的 Mono 列表。

Mono<List< Rule>>Mono<List< RuleResult>>

我有这个,它可以工作,但阻止执行对我来说似乎不正确:

List<Rule> RuleSet
...
Mono<List<RuleResult>> result= Mono.just(RuleSet.stream().map(rule -> rule.assess(object).block()).collect(Collectors.toList()));

如何将其转换为阻塞较少的版本?

我尝试了以下方法:

//Create a List of Monos
List<Mono<RuleResult>> ruleStream=statefulRuleSet.stream().map(rule -> rule.assess(assessmentObject)).collect(Collectors.toList());

//Cannot convert type
Flux.zip(ruleStream,...)).collectList();

//Not sure how to do this
Flux.fromIterable(ruleStream...).collectList();

也许我正在考虑一个错误的整体解决方案,有人有任何指示吗?

例如:

interface Rule extends Function<Object, Mono<Object>> { }

public static void main(String[] args) {

  Rule rule1 = (o) -> Mono.just(Integer.valueOf(o.hashCode()));
  Rule rule2 = (o) -> Mono.just(o.toString());
  List<Rule> rules = List.of(rule1, rule2);

  Object object = new Object();

  Mono<List<Object>> result = Flux.fromIterable(rules)
    .flatMapSequential(rule -> rule.apply(object)).collectList();

  result.subscribe(System.out::println);
}

使用 flatMapSequential 允许您同时等待最多 maxConcurrency 个结果。 maxConcurrency 值可以指定为 flatMapSequential 的附加参数。在 reactor-core 3.3.8 中它的默认值为 256.