使用 Kotlin 扩展函数将 RxJava 可观察对象转换为实时数据

Convert RxJava Observables To Live Data With Kotlin Extension Functions

我一直在使用 LiveDataReactiveStreams.fromPublisher() 库在我的代码中使用大量 RxJava Observables 转换为 LiveData。所以我想在 RxJava Observable 中添加一个扩展函数来轻松地将它们转换为 LiveData.

这些是我的扩展函数:

fun <T> Flowable<T>.toLiveData() :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this)
}

fun <T> Observable<T>.toLiveData(backPressureStrategy: BackpressureStrategy) :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this.toFlowable(backPressureStrategy))
}

fun <T> Single<T>.toLiveData() :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
}

fun <T> Maybe<T>.toLiveData() :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
}

fun <T> Completable.toLiveData() :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
}

我的问题是:

  1. 这是个好主意吗?
  2. 有更好的方法吗?
  3. 这些扩展功能可以更好吗?

P.S.

我是 Kotlin 的新手,所以我问这些问题。任何有用的东西将不胜感激。非常感谢。

这些函数可以更短一些,但这根本不是一个好主意。 Rx 提供了比 LiveData 监听器更多的可能性

如果您想同时使用 LiveData 和 Rx,您的解决方案很好。

如果您只想自动处理您的订阅,您可以像这样在 Disposable 上实现它:

private class LifecycleDisposable(obj: Disposable) :
        DefaultLifecycleObserver, Disposable by obj {
    override fun onStop(owner: LifecycleOwner) {
        if (!isDisposed) {
            dispose()
        }
    }
}

fun Disposable.attachToLifecycle(owner: LifecycleOwner) {
    owner.lifecycle.addObserver(LifecycleDisposable(this))
}

并这样称呼它

Observable.just(1, 2, 3).subscribe().attachToLifecycle(this)

其中 this 引用任何 LifecycleOwner。

简而言之:


我认为这是个不错的主意。 LiveData 的一个好处是可以直接在数据绑定布局中使用它。假设在您的视图模型中您有:

val user: LiveData<User>

data class User(val firstName: String, val lastName: String)

在您的布局中,您可以直接绑定 User 的属性:

android:text="${viewModel.user.firstName}"

你不能像这样在数据绑定中使用反应流。如果 userFlowable<User>,引用 ${viewModel.user.firstName} 将不起作用。

此外,如果您的 activity 或片段调用 ViewDataBinding.setLifecycleOwner(LifecycleOwner):

binding.setLifecycleOwner(this)

Completable 转换为 LiveData<T> 的那个对我来说真的没有意义,因为它永远不会通知观察者任何事情,所以我只是摆脱它。

从反应流转换为实时数据时有一些注意事项(就像我想 ), but I don't think they are related to the extension functions you presented, those seem to do their job. The issue to keep in mind here is that when the lifecycle owner moves from active to inactive state, PublisherLiveData 取消对流的订阅,当状态变为活动时,它会创建一个新的订阅,这意味着在许多情况下重新启动流(我猜那是如果流是“冷的”),而您可能想从旋转或其他配置更改后的位置恢复流。如果流是“热的”另一方面,排放在非活动状态下被忽略。我认为即使您直接使用反应流并手动处理生命周期,也必须解决这个问题。但问题是,简单地将反应流转换为 LiveData 是足以解决这个问题。

最好将这些方法记录为不处理必须在上游处理的错误状态。或者,这可能是这些功能的改进之一——首先转换流以处理错误(例如,作为具有默认值的 lambda 参数)。另一种可能性是利用 Result(目前处于试验阶段)或类似的东西来封装成功或错误。


事后想想,关于我上面写的这部分:

There are some considerations when converting from reactive streams to live data, but I don't think they are related to the extension functions you presented.

我仍然认为它在一般情况下是正确的,但是我不确定你是否真的想在大多数时候在实践中使用 Single.toLiveData()Maybe.toLiveData()。由于 MaybeSingle 是对一次性操作进行建模,因此当没有活跃的观察者时最好不要取消它,一旦有新的活跃观察者就必须重新启动它。相反,发布到某些 MutableLiveData 并在 onCleared 中处理 Single/Maybe 可能会有用(我不确定是否可以封装在扩展函数中)。它们可能仍然有一些我现在根本看不到的用途。

顺便说一下,你的 Flowable.toLiveData() 已经在 androidx.lifecycle:lifecycle-reactivestreams-ktx artifact 中了。

剩下 Observable.toLiveData(),我认为它应该和 Flowable 一样有用。