RxJava2 从侦听器创建 Flowable 并在最后删除侦听器

RxJava2 create Flowable from listener and remove listener at the end

我的用例与将 RxJava2 与 Firebase 数据库结合使用有关。

我有一个 DatabaseReference,我可以向它注册值侦听器。 我可以像这样将它转换成可流动的:

disposable = Flowable.create<DataSnapshot>({ s ->
            dbRef.addValueEventListener(object : ValueEventListener {
                override fun onCancelled(p0: DatabaseError) {...}

                override fun onDataChange(value: DataSnapshot) {
                    s.onNext(value)
                }
            })
        }, BackpressureStrategy.BUFFER)
        .subscribe(...)

我希望能够在处置一次性物品时移除监听器。 知道我该怎么做吗?

我看到在 rxjava 1 中可能有 this possibility,但在 rxjava2

中不可用

对于 RxJava2,您需要使用 setCancellable() 方法,并将您的侦听器删除代码放在那里。
这很像 Emitter.setCancellation() from RxJava1, when creating Observable with Observable.fromEmitter().

另请注意 akarnokd 关于取消的注释:
"But note that unless the create logic gives up the scheduler (by terminating or going async), the cancellation logic may not ever execute due to same-pool livelock." (RxJava 2: always unsubscribe on the .subscribeOn(..) scheduler?)