RxJava2 从侦听器创建 Flowable 并在最后删除侦听器
RxJava2 create Flowable from listener and remove listener at the end
我的用例与将 RxJava2 与 Firebase 数据库结合使用有关。
我有一个 DatabaseReference,我可以向它注册值侦听器。
我可以像这样将它转换成可流动的:
disposable = Flowable.create<DataSnapshot>({ s ->
dbRef.addValueEventListener(object : ValueEventListener {
override fun onCancelled(p0: DatabaseError) {...}
override fun onDataChange(value: DataSnapshot) {
s.onNext(value)
}
})
}, BackpressureStrategy.BUFFER)
.subscribe(...)
我希望能够在处置一次性物品时移除监听器。
知道我该怎么做吗?
我看到在 rxjava 1 中可能有 this possibility,但在 rxjava2
中不可用
对于 RxJava2,您需要使用 setCancellable() 方法,并将您的侦听器删除代码放在那里。
这很像 Emitter.setCancellation() from RxJava1, when creating Observable with Observable.fromEmitter().
另请注意 akarnokd 关于取消的注释:
"But note that unless the create logic gives up the scheduler (by terminating or going async), the cancellation logic may not ever execute due to same-pool livelock."
(RxJava 2: always unsubscribe on the .subscribeOn(..) scheduler?)
我的用例与将 RxJava2 与 Firebase 数据库结合使用有关。
我有一个 DatabaseReference,我可以向它注册值侦听器。 我可以像这样将它转换成可流动的:
disposable = Flowable.create<DataSnapshot>({ s ->
dbRef.addValueEventListener(object : ValueEventListener {
override fun onCancelled(p0: DatabaseError) {...}
override fun onDataChange(value: DataSnapshot) {
s.onNext(value)
}
})
}, BackpressureStrategy.BUFFER)
.subscribe(...)
我希望能够在处置一次性物品时移除监听器。 知道我该怎么做吗?
我看到在 rxjava 1 中可能有 this possibility,但在 rxjava2
中不可用对于 RxJava2,您需要使用 setCancellable() 方法,并将您的侦听器删除代码放在那里。
这很像 Emitter.setCancellation() from RxJava1, when creating Observable with Observable.fromEmitter().
另请注意 akarnokd 关于取消的注释:
"But note that unless the create logic gives up the scheduler (by terminating or going async), the cancellation logic may not ever execute due to same-pool livelock."
(RxJava 2: always unsubscribe on the .subscribeOn(..) scheduler?)