如果满足条件,反应器如何重复某些步骤 n 次

Reactor how to repeat some step n times if condition is met

请帮我做reactor 我需要检查一个条件最多 n 次,return 毕竟是最终结果

我发现 reactor 有 reactor-extra 模块

https://projectreactor.io/docs/extra/snapshot/api/reactor/retry/Repeat.html

有建筑 Repeat.create(java.util.function.Predicate<? super RepeatContext<T>> predicate, long n) 重复 n 次的函数,仅当谓词 return 为真时。

看起来是正确的解决方案,但我不明白应该在哪里
那个我想重复的动作? 我有很多动作的 Flux,但我只想重复一个

请举例代码

谢谢

private int culculateNextResult(some params) {
          // some implementation  
 }



private Boolean compareResults(int prevRes, int nextRes) {
          // some implementation
 }

 public Flux<Boolean> run(some params, Flux<Integer> prevResults){

      return prevResults.map(elem -> compareResults(elem, culculateNextResult(some params)));

 // THIS LOGIC SHOULD BE REPEATED N times if compareResults(elem,       
 // culculateNextResult(some params))) == false, if true, we don't need 
// to repeat 
     }

我想重复 compareResults(elem, culculateNextResult(some params))) 直到它为真。但 n 倍最大值和 return 结果通量

Flux.repeatMono.repeat 将重新订阅源,因此源的每个先前步骤都将通过新订阅重复。

由于calculateNextResultcompareResults在你的例子中都是同步操作,你可以使用一个简单的for循环来重复...

    public Flux<Boolean> run(some params, Flux<Integer> prevResults){
        return prevResults.map(elem -> {
            for (int i = 0; i < 5; i++) {
                if (compareResults(elem, calculateNextResult(some params))) {
                    return true;
                }
            }
            return false;
        });
    }

如果 calculateNextResultcompareResults 是返回 Mono 的响应式方法,那么您可以使用 flatMap 而不是 map,并使用 Mono.repeat* 方法。

例如,像这样:

    private Mono<Integer> calculateNextResult(some params) {
        // some implementation
    }

    private Mono<Boolean> compareResults(int prevRes, int nextRes) {
        // some implementation
    }
    public Flux<Boolean> run(some params, Flux<Integer> prevResults){

        return prevResults.flatMap(prevResult -> 

            calculateNextResult(some params)
                    .flatMap(nextResult -> compareResults(prevResult, nextResult))
                    .filter(comparisonResult -> comparisonResult)
                    .repeatWhenEmpty(Repeat.times(5))
                    .defaultIfEmpty(false));
    }

在此示例中,repeatWhenEmpty 将导致对在 flatMap 中创建的 Mono 的新订阅,这将导致 calculateNextResult 重新计算(假设从 calculateNextResult 返回的 Mono 是设置为计算每个订阅的值)。