为 Observable 流实现 "valve",包括缓冲阀门重新打开前发出的最后一个元素

Implementation of a "valve" for Observable streams, including buffering the last element emmitted before the valve reopened

我正在努力思考如何在 RxJava (2.0) 中实现某些东西。它适用于 Android,我使用的是 Kotlin,尽管平台和语言的选择在这里无关紧要。

我的想法是,我将在 RxJava 上建立某种 MVP 架构。在此实现中,我正在考虑 Activity(也可以是 Fragment 或自定义 View)公开值流(为简单起见,Booleans)指示生命周期事件,或者视图是附加还是分离。

基本思想基本上是这样的:

private val lifecycleEvents = PublishSubject.create<Boolean>()
val screenStates: Observable<Boolean> = lifecycleEvents.hide()

override fun onResume() {
    super.onResume()
    lifecycleEvents.onNext(true) // I'm attached!
}

override fun onPause() {
    lifecycleEvents.onNext(false) // I'm detached!
    super.onPause()
}

override fun onDestroy() {
    lifecycleEvents.onComplete() // I'm gone        
    super.onDestroy()
}

然后从另一端,Presenter 公开一个 Observable,它是表示屏幕状态的对象流 - 将由 View 呈现。

(这遵循本系列中解释的概念 http://hannesdorfmann.com/android/mosby3-mvi-1 - 归结为这样一个事实:Presenter 为 View 提供独立对象,封装整个屏幕状态,而不是 View 上的多个不同方法) .

然后我想绑定这两个可观察流,以便:

它将按如下方式工作(为简单起见,假设状态是 String 类型):

val merged: Observable<String> = ???

val attached = true
val disattached = false        

screenStates.onNext(attached)
fromPresenter.onNext("state A")
fromPresenter.onNext("state B")

screenStates.onNext(disattached)
fromPresenter.onNext("state C") // this won't survive at the end
fromPresenter.onNext("state D") // this will "override" the previous one.
// as that's the last state from BEFORE the screen is reattached

screenStates.onNext(attached)
// "state D" should be replayed at this point, "state C" is skipped and lost

fromPresenter.onNext("state E")

// what "merged" is supposed to have received at this point:
// "state A", "state B", "state D", "state E"

我不确定最好的惯用解决方案是什么。

我试图将其实现为 ObservableTransformer,但我不太正确。我相信转换器应该是无状态的,而我的解决方案倾向于显式跟踪发出的内容并缓冲最后一个元素 "manually" 等,这感觉很混乱而且过于迫切,所以我认为这是错误的。

我找到了 https://github.com/akarnokd/RxJava2Extensions/blob/master/src/main/java/hu/akarnokd/rxjava2/operators/FlowableValve.java,但是实现看起来非常复杂,我不敢相信它不能以更简单的方式完成(我不需要所有的灵活性,我只想要一些东西适用于描述的用例)。

任何见解都将不胜感激,包括在 Android 的上下文中是否还有其他我应该考虑的事情。另请注意,我不使用 RxKotlin 绑定(我可能,我只是不认为这里应该需要它们)。

编辑:

下面是我当前的实现。正如我所说,我对此不太满意,因为它是明确有状态的,我相信这应该以声明方式实现,利用 RxJava 的一些构造。

我需要合并两个不同类型的流,因为 combineLatestzip 都没有完全做到,所以我使用了一个技巧,为两种不同类型的流创建了一个通用包装器事件。又引入了一定的开销。

sealed class Event
class StateEvent(val state: String): Event()
class LifecycleEvent(val attached: Boolean): Event()

class ValveTransformer(val valve: Observable<Boolean>) : ObservableTransformer<String, String> {
    var lastStateEvent: Event? = null
    var lastLifecycleEvent = LifecycleEvent(false)

    private fun buffer(event: StateEvent) {
        lastStateEvent = event
    }

    private fun buffer(event: LifecycleEvent) {
        lastLifecycleEvent = event
    }

    private fun popLastState(): String {
        val bufferedState = (lastStateEvent as StateEvent).state
        lastStateEvent = null
        return bufferedState
    }

    override fun apply(upstream: Observable<String>): ObservableSource<String> = Observable
            .merge(
                    upstream.map(::StateEvent).doOnNext { buffer(it) }, 
                    valve.distinctUntilChanged().map(::LifecycleEvent).doOnNext { buffer (it) })
            .switchMap { when {
                it is LifecycleEvent && it.attached && lastStateEvent != null ->
                    // the screen is attached now, pump the pending state out of the buffer
                    just(popLastState())
                it is StateEvent && lastLifecycleEvent.attached -> just(it.state)
                else -> empty<String>()
            } }
}

在我看来,您正在寻找一个 BehaviorSubject - 这是一个向每个订阅的观察者发出它观察到的最新项目以及所有后续观察到的项目的主题。

如果您在 Presenter 中使用它,请在分离视图时取消订阅它,并在附加视图时订阅它,您应该会得到想要的。

将@TpoM6oH 的回答与原提案相结合:

val bufferedEvent: Observable<Event> = BehaviorSubject.create()
bufferedEventResult = valve.switchMap( 
     viewEvent -> if (viewEvent) 
                       bufferedEvent 
                  else Observable.never() )

switchMap() 运算符负责订阅和取消订阅。

然后您可以使用 publish() 将生成的可观察对象拆分为必要的状态和事件。我不确定 ObservableTransformer 的需求是什么。