RxJava 延迟 observables 触发意外延迟
RxJava delayed observables fire with unexpected delays
我使用 RxJava observables 设置了一个事件序列。基本上,我将使用 Observable.just(Events.*)
创建的不同事件与使用 observable.delay(time, timeUnit, scheduler)
函数设置的不同延迟合并。然后我 post 它们到 PublishSubject
(下面代码中的 events
)并订阅那个 PublishSubject
来观察序列(下面代码中的 observeEvents()
函数).它曾经工作正常,但最近我在我的设备上看到一个非常奇怪的行为(OnePlus One with android 5.0.2)(并且在模拟器上看不到它)。基本上事件会混淆,延迟较高的事件可以在延迟较小的事件之前发生,延迟较小的事件可以排在队列的末尾,有时所有事件都可能按正确的顺序发生。前 3 个事件特别经常混合。有时根本没有观察到某些事件。这里会发生什么?
代码在 Kotlin 中:
var computationScheduler = Schedulers.computation()
private val events: PublishSubject<Events> = PublishSubject.create()
private val userActionSubject: PublishSubject<Events> = PublishSubject.create()
Observable.merge(
event0(),
event1(),
event2(),
userActionOrEvent3(),
userActionOrEvent4())
.subscribe({
// Weird timings are observed here already
events.onNext(it)
}, { e ->
events.onError(e)
}))
private fun userActionOrEvent4(): Observable<Events> {
return Observable.amb(Observable.just(Events.Event4)
.delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
.take(1)
}
private fun userActionOrEvent3(): Observable<Events> {
return Observable.amb(Observable.just(Events.Event3)
.delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
.take(1)
}
private fun event2() = Observable.just(Events.Event2)
.delay(1800, TimeUnit.MILLISECONDS, computationScheduler)
private fun event1() = Observable.just(Events.Event1)
.delay(200, TimeUnit.MILLISECONDS, computationScheduler)
private fun event0() = Observable.just(Events.Event0)
.subscribeOn(computationScheduler)
open fun observeEvents(): Observable<Events> = events.asObservable().observeOn(AndroidSchedulers.mainThread())
open fun onUserAction() {
userActionSubject.onNext(Events.Action)
}
这是因为您使用的是 merge()
而不是 concat
这篇中篇文章将向您解释其中的区别https://medium.com/fueled-android/rxify-a-simple-spell-for-complex-rxjava-operators-part-2-b82b379f5c7f#.ekppat50o
原来问题是由 computationScheduler 引起的,当我将 Schedulers.computation()
更改为 Schedulers.newThread()
事件开始按预期时间触发时。
我使用 RxJava observables 设置了一个事件序列。基本上,我将使用 Observable.just(Events.*)
创建的不同事件与使用 observable.delay(time, timeUnit, scheduler)
函数设置的不同延迟合并。然后我 post 它们到 PublishSubject
(下面代码中的 events
)并订阅那个 PublishSubject
来观察序列(下面代码中的 observeEvents()
函数).它曾经工作正常,但最近我在我的设备上看到一个非常奇怪的行为(OnePlus One with android 5.0.2)(并且在模拟器上看不到它)。基本上事件会混淆,延迟较高的事件可以在延迟较小的事件之前发生,延迟较小的事件可以排在队列的末尾,有时所有事件都可能按正确的顺序发生。前 3 个事件特别经常混合。有时根本没有观察到某些事件。这里会发生什么?
代码在 Kotlin 中:
var computationScheduler = Schedulers.computation()
private val events: PublishSubject<Events> = PublishSubject.create()
private val userActionSubject: PublishSubject<Events> = PublishSubject.create()
Observable.merge(
event0(),
event1(),
event2(),
userActionOrEvent3(),
userActionOrEvent4())
.subscribe({
// Weird timings are observed here already
events.onNext(it)
}, { e ->
events.onError(e)
}))
private fun userActionOrEvent4(): Observable<Events> {
return Observable.amb(Observable.just(Events.Event4)
.delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
.take(1)
}
private fun userActionOrEvent3(): Observable<Events> {
return Observable.amb(Observable.just(Events.Event3)
.delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
.take(1)
}
private fun event2() = Observable.just(Events.Event2)
.delay(1800, TimeUnit.MILLISECONDS, computationScheduler)
private fun event1() = Observable.just(Events.Event1)
.delay(200, TimeUnit.MILLISECONDS, computationScheduler)
private fun event0() = Observable.just(Events.Event0)
.subscribeOn(computationScheduler)
open fun observeEvents(): Observable<Events> = events.asObservable().observeOn(AndroidSchedulers.mainThread())
open fun onUserAction() {
userActionSubject.onNext(Events.Action)
}
这是因为您使用的是 merge()
而不是 concat
这篇中篇文章将向您解释其中的区别https://medium.com/fueled-android/rxify-a-simple-spell-for-complex-rxjava-operators-part-2-b82b379f5c7f#.ekppat50o
原来问题是由 computationScheduler 引起的,当我将 Schedulers.computation()
更改为 Schedulers.newThread()
事件开始按预期时间触发时。