如何在 kotlin 协程上进行指数退避重试
How to Exponential Backoff retry on kotlin coroutines
我正在使用 kotlin 协程进行网络请求,使用扩展方法调用 class 进行这样的改造
public suspend fun <T : Any> Call<T>.await(): T {
return suspendCancellableCoroutine { continuation ->
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>?, response: Response<T?>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
continuation.resumeWithException(
NullPointerException("Response body is null")
)
} else {
continuation.resume(body)
}
} else {
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<T>, t: Throwable) {
// Don't bother with resuming the continuation if it is already cancelled.
if (continuation.isCancelled) return
continuation.resumeWithException(t)
}
})
registerOnCompletion(continuation)
}
}
然后从调用方我使用上面这样的方法
private fun getArticles() = launch(UI) {
loading.value = true
try {
val networkResult = api.getArticle().await()
articles.value = networkResult
}catch (e: Throwable){
e.printStackTrace()
message.value = e.message
}finally {
loading.value = false
}
}
我想在某些情况下以指数方式重试此 api 调用,即(IOException)我该如何实现??
我建议为您的重试逻辑编写一个帮助程序 higher-order function。您可以使用以下实现作为开始:
suspend fun <T> retryIO(
times: Int = Int.MAX_VALUE,
initialDelay: Long = 100, // 0.1 second
maxDelay: Long = 1000, // 1 second
factor: Double = 2.0,
block: suspend () -> T): T
{
var currentDelay = initialDelay
repeat(times - 1) {
try {
return block()
} catch (e: IOException) {
// you can log an error here and/or make a more finer-grained
// analysis of the cause to see if retry is needed
}
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
return block() // last attempt
}
使用这个功能非常简单:
val networkResult = retryIO { api.getArticle().await() }
您可以根据具体情况更改重试参数,例如:
val networkResult = retryIO(times = 3) { api.doSomething().await() }
您还可以完全更改 retryIO
的实现以满足您的应用程序的需要。例如,您可以硬编码所有重试参数,摆脱对重试次数的限制,更改默认值等。
您可以通过简单的用法尝试这种简单但非常敏捷的方法:
编辑: 在单独的答案中添加了更复杂的解决方案。
class Completion(private val retry: (Completion) -> Unit) {
fun operationFailed() {
retry.invoke(this)
}
}
fun retryOperation(retries: Int,
dispatcher: CoroutineDispatcher = Dispatchers.Default,
operation: Completion.() -> Unit
) {
var tryNumber = 0
val completion = Completion {
tryNumber++
if (tryNumber < retries) {
GlobalScope.launch(dispatcher) {
delay(TimeUnit.SECONDS.toMillis(tryNumber.toLong()))
operation.invoke(it)
}
}
}
operation.invoke(completion)
}
像这样使用它:
retryOperation(3) {
if (!tryStuff()) {
// this will trigger a retry after tryNumber seconds
operationFailed()
}
}
您显然可以在此基础上构建更多内容。
这是我之前回答的更复杂和方便的版本,希望它能帮助别人:
class RetryOperation internal constructor(
private val retries: Int,
private val initialIntervalMilli: Long = 1000,
private val retryStrategy: RetryStrategy = RetryStrategy.LINEAR,
private val retry: suspend RetryOperation.() -> Unit
) {
var tryNumber: Int = 0
internal set
suspend fun operationFailed() {
tryNumber++
if (tryNumber < retries) {
delay(calculateDelay(tryNumber, initialIntervalMilli, retryStrategy))
retry.invoke(this)
}
}
}
enum class RetryStrategy {
CONSTANT, LINEAR, EXPONENTIAL
}
suspend fun retryOperation(
retries: Int = 100,
initialDelay: Long = 0,
initialIntervalMilli: Long = 1000,
retryStrategy: RetryStrategy = RetryStrategy.LINEAR,
operation: suspend RetryOperation.() -> Unit
) {
val retryOperation = RetryOperation(
retries,
initialIntervalMilli,
retryStrategy,
operation,
)
delay(initialDelay)
operation.invoke(retryOperation)
}
internal fun calculateDelay(tryNumber: Int, initialIntervalMilli: Long, retryStrategy: RetryStrategy): Long {
return when (retryStrategy) {
RetryStrategy.CONSTANT -> initialIntervalMilli
RetryStrategy.LINEAR -> initialIntervalMilli * tryNumber
RetryStrategy.EXPONENTIAL -> 2.0.pow(tryNumber).toLong()
}
}
用法:
coroutineScope.launch {
retryOperation(3) {
if (!tryStuff()) {
Log.d(TAG, "Try number $tryNumber")
operationFailed()
}
}
}
这里是 Flow
和 retryWhen
函数的例子
RetryWhen
分机:
fun <T> Flow<T>.retryWhen(
@FloatRange(from = 0.0) initialDelay: Float = RETRY_INITIAL_DELAY,
@FloatRange(from = 1.0) retryFactor: Float = RETRY_FACTOR_DELAY,
predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long, delay: Long) -> Boolean
): Flow<T> = this.retryWhen { cause, attempt ->
val retryDelay = initialDelay * retryFactor.pow(attempt.toFloat())
predicate(cause, attempt, retryDelay.toLong())
}
用法:
flow {
...
}.retryWhen { cause, attempt, delay ->
delay(delay)
...
}
流版本https://github.com/hoc081098/FlowExt
package com.hoc081098.flowext
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.retryWhen
@ExperimentalTime
public fun <T> Flow<T>.retryWithExponentialBackoff(
initialDelay: Duration,
factor: Double,
maxAttempt: Long = Long.MAX_VALUE,
maxDelay: Duration = Duration.INFINITE,
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
require(maxAttempt > 0) { "Expected positive amount of maxAttempt, but had $maxAttempt" }
return retryWhenWithExponentialBackoff(
initialDelay = initialDelay,
factor = factor,
maxDelay = maxDelay
) { cause, attempt -> attempt < maxAttempt && predicate(cause) }
}
@ExperimentalTime
public fun <T> Flow<T>.retryWhenWithExponentialBackoff(
initialDelay: Duration,
factor: Double,
maxDelay: Duration = Duration.INFINITE,
predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T> = flow {
var currentDelay = initialDelay
retryWhen { cause, attempt ->
predicate(cause, attempt).also {
if (it) {
delay(currentDelay)
currentDelay = (currentDelay * factor).coerceAtMost(maxDelay)
}
}
}.let { emitAll(it) }
}
我正在使用 kotlin 协程进行网络请求,使用扩展方法调用 class 进行这样的改造
public suspend fun <T : Any> Call<T>.await(): T {
return suspendCancellableCoroutine { continuation ->
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>?, response: Response<T?>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
continuation.resumeWithException(
NullPointerException("Response body is null")
)
} else {
continuation.resume(body)
}
} else {
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<T>, t: Throwable) {
// Don't bother with resuming the continuation if it is already cancelled.
if (continuation.isCancelled) return
continuation.resumeWithException(t)
}
})
registerOnCompletion(continuation)
}
}
然后从调用方我使用上面这样的方法
private fun getArticles() = launch(UI) {
loading.value = true
try {
val networkResult = api.getArticle().await()
articles.value = networkResult
}catch (e: Throwable){
e.printStackTrace()
message.value = e.message
}finally {
loading.value = false
}
}
我想在某些情况下以指数方式重试此 api 调用,即(IOException)我该如何实现??
我建议为您的重试逻辑编写一个帮助程序 higher-order function。您可以使用以下实现作为开始:
suspend fun <T> retryIO(
times: Int = Int.MAX_VALUE,
initialDelay: Long = 100, // 0.1 second
maxDelay: Long = 1000, // 1 second
factor: Double = 2.0,
block: suspend () -> T): T
{
var currentDelay = initialDelay
repeat(times - 1) {
try {
return block()
} catch (e: IOException) {
// you can log an error here and/or make a more finer-grained
// analysis of the cause to see if retry is needed
}
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
return block() // last attempt
}
使用这个功能非常简单:
val networkResult = retryIO { api.getArticle().await() }
您可以根据具体情况更改重试参数,例如:
val networkResult = retryIO(times = 3) { api.doSomething().await() }
您还可以完全更改 retryIO
的实现以满足您的应用程序的需要。例如,您可以硬编码所有重试参数,摆脱对重试次数的限制,更改默认值等。
您可以通过简单的用法尝试这种简单但非常敏捷的方法:
编辑: 在单独的答案中添加了更复杂的解决方案。
class Completion(private val retry: (Completion) -> Unit) {
fun operationFailed() {
retry.invoke(this)
}
}
fun retryOperation(retries: Int,
dispatcher: CoroutineDispatcher = Dispatchers.Default,
operation: Completion.() -> Unit
) {
var tryNumber = 0
val completion = Completion {
tryNumber++
if (tryNumber < retries) {
GlobalScope.launch(dispatcher) {
delay(TimeUnit.SECONDS.toMillis(tryNumber.toLong()))
operation.invoke(it)
}
}
}
operation.invoke(completion)
}
像这样使用它:
retryOperation(3) {
if (!tryStuff()) {
// this will trigger a retry after tryNumber seconds
operationFailed()
}
}
您显然可以在此基础上构建更多内容。
这是我之前回答的更复杂和方便的版本,希望它能帮助别人:
class RetryOperation internal constructor(
private val retries: Int,
private val initialIntervalMilli: Long = 1000,
private val retryStrategy: RetryStrategy = RetryStrategy.LINEAR,
private val retry: suspend RetryOperation.() -> Unit
) {
var tryNumber: Int = 0
internal set
suspend fun operationFailed() {
tryNumber++
if (tryNumber < retries) {
delay(calculateDelay(tryNumber, initialIntervalMilli, retryStrategy))
retry.invoke(this)
}
}
}
enum class RetryStrategy {
CONSTANT, LINEAR, EXPONENTIAL
}
suspend fun retryOperation(
retries: Int = 100,
initialDelay: Long = 0,
initialIntervalMilli: Long = 1000,
retryStrategy: RetryStrategy = RetryStrategy.LINEAR,
operation: suspend RetryOperation.() -> Unit
) {
val retryOperation = RetryOperation(
retries,
initialIntervalMilli,
retryStrategy,
operation,
)
delay(initialDelay)
operation.invoke(retryOperation)
}
internal fun calculateDelay(tryNumber: Int, initialIntervalMilli: Long, retryStrategy: RetryStrategy): Long {
return when (retryStrategy) {
RetryStrategy.CONSTANT -> initialIntervalMilli
RetryStrategy.LINEAR -> initialIntervalMilli * tryNumber
RetryStrategy.EXPONENTIAL -> 2.0.pow(tryNumber).toLong()
}
}
用法:
coroutineScope.launch {
retryOperation(3) {
if (!tryStuff()) {
Log.d(TAG, "Try number $tryNumber")
operationFailed()
}
}
}
这里是 Flow
和 retryWhen
函数的例子
RetryWhen
分机:
fun <T> Flow<T>.retryWhen(
@FloatRange(from = 0.0) initialDelay: Float = RETRY_INITIAL_DELAY,
@FloatRange(from = 1.0) retryFactor: Float = RETRY_FACTOR_DELAY,
predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long, delay: Long) -> Boolean
): Flow<T> = this.retryWhen { cause, attempt ->
val retryDelay = initialDelay * retryFactor.pow(attempt.toFloat())
predicate(cause, attempt, retryDelay.toLong())
}
用法:
flow {
...
}.retryWhen { cause, attempt, delay ->
delay(delay)
...
}
流版本https://github.com/hoc081098/FlowExt
package com.hoc081098.flowext
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.retryWhen
@ExperimentalTime
public fun <T> Flow<T>.retryWithExponentialBackoff(
initialDelay: Duration,
factor: Double,
maxAttempt: Long = Long.MAX_VALUE,
maxDelay: Duration = Duration.INFINITE,
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
require(maxAttempt > 0) { "Expected positive amount of maxAttempt, but had $maxAttempt" }
return retryWhenWithExponentialBackoff(
initialDelay = initialDelay,
factor = factor,
maxDelay = maxDelay
) { cause, attempt -> attempt < maxAttempt && predicate(cause) }
}
@ExperimentalTime
public fun <T> Flow<T>.retryWhenWithExponentialBackoff(
initialDelay: Duration,
factor: Double,
maxDelay: Duration = Duration.INFINITE,
predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T> = flow {
var currentDelay = initialDelay
retryWhen { cause, attempt ->
predicate(cause, attempt).also {
if (it) {
delay(currentDelay)
currentDelay = (currentDelay * factor).coerceAtMost(maxDelay)
}
}
}.let { emitAll(it) }
}