如何在 RxJava2 中使用重试运算符记住状态
How to remember state with retry operators in RxJava2
我有一个能够从中断中恢复的网络客户端,但在重试时需要最后一条消息。
Kotlin 中的示例:
fun requestOrResume(last: Message? = null): Flowable<Message> =
Flowable.create({ emitter ->
val connection = if (last != null)
client.start()
else
client.resumeFrom(last.id)
while (!emitter.isDisposed) {
val msg = connection.nextMessage()
emitter.onNext(msg)
}
}, BackpressureStrategy.MISSING)
requestOrResume()
.retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } }
// how to pass the resume data when there is a retry?
问题:如您所见,我需要最后收到的消息才能准备恢复通话。我如何跟踪它以便在重试时可以发出恢复请求?
一个可能的解决方案可能是创建一个持有者 class,它只持有对最后一条消息的引用,并在收到新消息时更新。这样,当重试时,可以从持有者那里获得最后一条消息。示例:
class MsgHolder(var last: Message? = null)
fun request(): Flowable<Message> {
val holder = MsgHolder()
return Flowable.create({ emitter ->
val connection = if (holder.last != null)
client.start()
else
client.resumeFrom(holder.last.id)
while (!emitter.isDisposed) {
val msg = connection.nextMessage()
holder.last = msg // <-- update holder reference
emitter.onNext(msg)
}
}, BackpressureStrategy.MISSING)
}
我认为这可能行得通,但感觉像是一个 hack(线程同步问题?)。
是否有更好的方法来跟踪状态以便重试?
仔细查看 buffer()
运算符:link
你可以这样使用它:
requestOrResume()
.buffer(2)
从现在开始,您的 Flowable
将 return List<Message>
有两个最新对象
请注意,除非您在最后一个元素周围重新抛出一个包装器(在功能上与您现有的 "hack"-ish 解决方案没有太大区别,但在我看来更丑陋),否则没有任何错误处理运算符可以在没有一些错误的情况下恢复最后一个元素外部帮助,因为他们只能访问 Throwable
的流。相反,请查看以下递归方法是否适合您的需要:
fun retryWithLast(seed: Flowable<Message>): Flowable<Message> {
val last$ = seed.last().cache();
return seed.onErrorResumeNext {
it.flatMap {
retryWithLast(last$.flatMap {
requestOrResume(it)
})
}
};
}
retryWithLast(requestOrResume());
最大的区别是使用 cache
在 Observable 中缓存最后一次尝试的尾随值,而不是在值中手动这样做。另请注意,错误处理程序中的递归意味着如果后续尝试继续失败,retryWithLast
将继续扩展流。
我有一个能够从中断中恢复的网络客户端,但在重试时需要最后一条消息。
Kotlin 中的示例:
fun requestOrResume(last: Message? = null): Flowable<Message> =
Flowable.create({ emitter ->
val connection = if (last != null)
client.start()
else
client.resumeFrom(last.id)
while (!emitter.isDisposed) {
val msg = connection.nextMessage()
emitter.onNext(msg)
}
}, BackpressureStrategy.MISSING)
requestOrResume()
.retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } }
// how to pass the resume data when there is a retry?
问题:如您所见,我需要最后收到的消息才能准备恢复通话。我如何跟踪它以便在重试时可以发出恢复请求?
一个可能的解决方案可能是创建一个持有者 class,它只持有对最后一条消息的引用,并在收到新消息时更新。这样,当重试时,可以从持有者那里获得最后一条消息。示例:
class MsgHolder(var last: Message? = null)
fun request(): Flowable<Message> {
val holder = MsgHolder()
return Flowable.create({ emitter ->
val connection = if (holder.last != null)
client.start()
else
client.resumeFrom(holder.last.id)
while (!emitter.isDisposed) {
val msg = connection.nextMessage()
holder.last = msg // <-- update holder reference
emitter.onNext(msg)
}
}, BackpressureStrategy.MISSING)
}
我认为这可能行得通,但感觉像是一个 hack(线程同步问题?)。
是否有更好的方法来跟踪状态以便重试?
仔细查看 buffer()
运算符:link
你可以这样使用它:
requestOrResume()
.buffer(2)
从现在开始,您的 Flowable
将 return List<Message>
有两个最新对象
请注意,除非您在最后一个元素周围重新抛出一个包装器(在功能上与您现有的 "hack"-ish 解决方案没有太大区别,但在我看来更丑陋),否则没有任何错误处理运算符可以在没有一些错误的情况下恢复最后一个元素外部帮助,因为他们只能访问 Throwable
的流。相反,请查看以下递归方法是否适合您的需要:
fun retryWithLast(seed: Flowable<Message>): Flowable<Message> {
val last$ = seed.last().cache();
return seed.onErrorResumeNext {
it.flatMap {
retryWithLast(last$.flatMap {
requestOrResume(it)
})
}
};
}
retryWithLast(requestOrResume());
最大的区别是使用 cache
在 Observable 中缓存最后一次尝试的尾随值,而不是在值中手动这样做。另请注意,错误处理程序中的递归意味着如果后续尝试继续失败,retryWithLast
将继续扩展流。