如何在 Mono.retry 中实现变量退避?
How do I implement variable backoff in Mono.retry?
我觉得我缺少一些简单的基本细微差别,但出于某种原因 Mono.delay()
对我不起作用。我有 Mono 发出可以被限制的 http 请求。我需要等待提供的时间重试。这是现在的样子
internal fun <T> Mono<T>.retryAfter(maxRetries: Int, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.from {
it.map { rs ->
val ex = rs.failure()
if (ex is RetryAfterException && rs.totalRetries() < maxRetries) {
println("*** API throttling on call to $uri. Will wait for ${ex.delay}. Retry count: ${rs.totalRetries()}. ms ${System.currentTimeMillis()}")
Mono.delay(ex.delay.plusMillis(500), Schedulers.parallel()).then(Mono.defer({
println(" Waited. ${System.currentTimeMillis()}")
this
}))
} else
Mono.error(rs.failure())
}
})
}
您可以使用内置的重试生成器
我建议您使用 Retry.fixedDelay()
,它允许您定义最大重试次数和每次尝试之间的延迟。当达到最大重试次数时,你会得到一个 Mono.error()
internal fun <T> Mono<T>.retryAfter(maxRetries: Long, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.fixedDelay(maxRetries, Duration.ofMillis(500))
.doBeforeRetry { rs: Retry.RetrySignal? -> println("Will retry " + rs?.totalRetries()) }
.doAfterRetry{ rs: Retry.RetrySignal? -> println("Has retried " + rs?.totalRetries()) }
.onRetryExhaustedThrow { _, rs -> rs.failure()})
}
您正在使用 map
,这导致 Flux<Mono<?>>
返回给操作员以进行重试控制。从操作员的角度 (Flux<?>
),任何 onNext
都意味着“你应该重试”。无论是 onNext("example")
还是 onNext(Mono.error(err))
都没有关系。
不使用 map
,而是使用 concatMap
。您在函数中生成的 Mono
将正确生成 Flux<?>
,其中 if
的“延迟”分支生成(延迟)onNext
,而另一个分支生成onError
.
我觉得我缺少一些简单的基本细微差别,但出于某种原因 Mono.delay()
对我不起作用。我有 Mono 发出可以被限制的 http 请求。我需要等待提供的时间重试。这是现在的样子
internal fun <T> Mono<T>.retryAfter(maxRetries: Int, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.from {
it.map { rs ->
val ex = rs.failure()
if (ex is RetryAfterException && rs.totalRetries() < maxRetries) {
println("*** API throttling on call to $uri. Will wait for ${ex.delay}. Retry count: ${rs.totalRetries()}. ms ${System.currentTimeMillis()}")
Mono.delay(ex.delay.plusMillis(500), Schedulers.parallel()).then(Mono.defer({
println(" Waited. ${System.currentTimeMillis()}")
this
}))
} else
Mono.error(rs.failure())
}
})
}
您可以使用内置的重试生成器
我建议您使用 Retry.fixedDelay()
,它允许您定义最大重试次数和每次尝试之间的延迟。当达到最大重试次数时,你会得到一个 Mono.error()
internal fun <T> Mono<T>.retryAfter(maxRetries: Long, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.fixedDelay(maxRetries, Duration.ofMillis(500))
.doBeforeRetry { rs: Retry.RetrySignal? -> println("Will retry " + rs?.totalRetries()) }
.doAfterRetry{ rs: Retry.RetrySignal? -> println("Has retried " + rs?.totalRetries()) }
.onRetryExhaustedThrow { _, rs -> rs.failure()})
}
您正在使用 map
,这导致 Flux<Mono<?>>
返回给操作员以进行重试控制。从操作员的角度 (Flux<?>
),任何 onNext
都意味着“你应该重试”。无论是 onNext("example")
还是 onNext(Mono.error(err))
都没有关系。
不使用 map
,而是使用 concatMap
。您在函数中生成的 Mono
将正确生成 Flux<?>
,其中 if
的“延迟”分支生成(延迟)onNext
,而另一个分支生成onError
.