如何在 Reactor 中结合使用 retry 和 repeat?

How to use retry and repeat combined in Reactor?

我正在轮询队列消息,如果发生错误我想做 retry,如果我没有收到任何消息我想做 repeat。重试和重复的总次数应等于 numRetries。实现此目标的最佳方法是什么?

这就是我到目前为止所取得的成就:

Flux.defer { awsSQSClient.receiveMessage(10) }
            .retry(numRetries)
            .repeatWhen { emittedEachAttempt ->
                emittedEachAttempt.handle<Flux<Message>> { lastEmitted, sink ->
                    val ctx = sink.currentContext()
                    val rl = ctx.getOrDefault("repeatsLeft", numRetries)
                    if (rl > 0 && lastEmitted == 0L) {
                        sink.next(Context.of("repeatsLeft", rl - 1)) // <-- This line doesn't compile although I copied it from reactor documentation
                    } else {
                        sink.next(Flux.empty())
                    }

                }
            }

我正在为 repeatWhen lambda 而苦苦挣扎,我也不明白如何耦合 retryrepeat 计数。

I would like to do retry if some error happens or repeat if I don't receive any messages.

在这里区分重试和重复次数没有多大意义,尤其是当您想要整体跟踪总重复次数/重试次数时。两个运营商本质上都会做同样的事情(重新订阅源发布者),只是他们在不同的情况下这样做。

你会更好用:

.switchIfEmpty(Mono.error(new RuntimeException()))

...在重试行上方,然后完全废弃 repeatWhen() 部分。然后,您可以将 numRetries() 设置为您想要的重试/重复总数。

(当然,您可能还想在实际使用中使用更合适的异常类型。)