如何在 Mono.retry 中实现变量退避?

How do I implement variable backoff in Mono.retry?

我觉得我缺少一些简单的基本细微差别,但出于某种原因 Mono.delay() 对我不起作用。我有 Mono 发出可以被限制的 http 请求。我需要等待提供的时间重试。这是现在的样子

internal fun <T> Mono<T>.retryAfter(maxRetries: Int, uri: URI): Mono<T> {
    // `this` is instance of Mono<T>
    return this.retryWhen(Retry.from {
        it.map { rs ->
            val ex = rs.failure()
            if (ex is RetryAfterException && rs.totalRetries() < maxRetries) {
                println("*** API throttling on call to $uri. Will wait for ${ex.delay}. Retry count: ${rs.totalRetries()}. ms ${System.currentTimeMillis()}")
                Mono.delay(ex.delay.plusMillis(500), Schedulers.parallel()).then(Mono.defer({
                    println("   Waited. ${System.currentTimeMillis()}")
                    this
                }))
            } else
                Mono.error(rs.failure())
        }
    })
}

您可以使用内置的重试生成器 我建议您使用 Retry.fixedDelay() ,它允许您定义最大重试次数和每次尝试之间的延迟。当达到最大重试次数时,你会得到一个 Mono.error()

    internal fun <T> Mono<T>.retryAfter(maxRetries: Long, uri: URI): Mono<T> {
        // `this` is instance of Mono<T>
        return this.retryWhen(Retry.fixedDelay(maxRetries, Duration.ofMillis(500))
                .doBeforeRetry { rs: Retry.RetrySignal? -> println("Will retry " + rs?.totalRetries()) }
                .doAfterRetry{ rs: Retry.RetrySignal? -> println("Has retried " + rs?.totalRetries()) }
                .onRetryExhaustedThrow { _, rs ->  rs.failure()})
    }

您正在使用 map,这导致 Flux<Mono<?>> 返回给操作员以进行重试控制。从操作员的角度 (Flux<?>),任何 onNext 都意味着“你应该重试”。无论是 onNext("example") 还是 onNext(Mono.error(err)) 都没有关系。

不使用 map,而是使用 concatMap。您在函数中生成的 Mono 将正确生成 Flux<?>,其中 if 的“延迟”分支生成(延迟)onNext,而另一个分支生成onError.