使用 Monix Observable 的 .onErrorRestartIf 时限制重试次数?
Limit the number of retries when using Monix Observable's .onErrorRestartIf?
Monix observables 具有 API .onErrorRestartIf(f: Throwable => Boolean)
和 .onErrorRestart(times: Int)
。如何指定它应该重试的最大次数 .onErrorRestartIf?
警告:这使用共享可变状态,对于冷可观察对象可能不正确。请参阅 Alexandru 的回答。
定义一个函数来完成它:
def limitedRetries(maxRetries: AtomicInt, shouldRetryOnException: Throwable => Boolean): Throwable => Boolean =
ex => maxRetries.decrementAndGet() > 0 && shouldRetryOnException(ex)
并在onErrorRestartIf
中使用这个函数
.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart))
仅供参考,这里使用了 monix AtomicInt...
import monix.execution.atomic.AtomicInt
您可以根据 onErrorHandleWith
:
构建自己的循环
def retryLimited[A](fa: Observable[A], maxRetries: Int)
(p: Throwable => Boolean): Observable[A] = {
// If we have no retries left, return the source untouched
if (maxRetries <= 0) fa else
fa.onErrorHandleWith { err =>
// If predicate holds, do recursive call
if (p(err))
retryLimited(fa, maxRetries - 1)(p)
else
Observable.raiseError(err)
}
}
如果你不喜欢简单的函数(我喜欢),你总是可以公开一些扩展方法作为替代:
implicit class ObservableExtensions[A](val self: Observable[A])
extends AnyVal {
def onErrorRetryLimited(maxRetries: Int)
(p: Throwable => Boolean): Observable[A] =
retryLimited(self, maxRetries)(p)
}
请注意@JVS 的回答在精神上是可以的,但可能会有问题,因为它保持共享的可变状态,这对于冷可观察对象来说是不正确的。所以注意如果你这样做会发生什么:
val source = Observable.suspend {
if (Random.nextInt() % 10 != 0)
Observable.raiseError(new RuntimeException("dummy"))
else
Observable(1, 2, 3)
}
val listT = source
.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart))
.toListL
listT.runAsync // OK
listT.runAsync // Ooops, shared state, we might not have retries left
警惕 Observable 运算符中的可变共享状态。你当然可以这样工作,但你必须意识到其中的危险:-)
Monix observables 具有 API .onErrorRestartIf(f: Throwable => Boolean)
和 .onErrorRestart(times: Int)
。如何指定它应该重试的最大次数 .onErrorRestartIf?
警告:这使用共享可变状态,对于冷可观察对象可能不正确。请参阅 Alexandru 的回答。
定义一个函数来完成它:
def limitedRetries(maxRetries: AtomicInt, shouldRetryOnException: Throwable => Boolean): Throwable => Boolean =
ex => maxRetries.decrementAndGet() > 0 && shouldRetryOnException(ex)
并在onErrorRestartIf
.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart))
仅供参考,这里使用了 monix AtomicInt...
import monix.execution.atomic.AtomicInt
您可以根据 onErrorHandleWith
:
def retryLimited[A](fa: Observable[A], maxRetries: Int)
(p: Throwable => Boolean): Observable[A] = {
// If we have no retries left, return the source untouched
if (maxRetries <= 0) fa else
fa.onErrorHandleWith { err =>
// If predicate holds, do recursive call
if (p(err))
retryLimited(fa, maxRetries - 1)(p)
else
Observable.raiseError(err)
}
}
如果你不喜欢简单的函数(我喜欢),你总是可以公开一些扩展方法作为替代:
implicit class ObservableExtensions[A](val self: Observable[A])
extends AnyVal {
def onErrorRetryLimited(maxRetries: Int)
(p: Throwable => Boolean): Observable[A] =
retryLimited(self, maxRetries)(p)
}
请注意@JVS 的回答在精神上是可以的,但可能会有问题,因为它保持共享的可变状态,这对于冷可观察对象来说是不正确的。所以注意如果你这样做会发生什么:
val source = Observable.suspend {
if (Random.nextInt() % 10 != 0)
Observable.raiseError(new RuntimeException("dummy"))
else
Observable(1, 2, 3)
}
val listT = source
.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart))
.toListL
listT.runAsync // OK
listT.runAsync // Ooops, shared state, we might not have retries left
警惕 Observable 运算符中的可变共享状态。你当然可以这样工作,但你必须意识到其中的危险:-)