RxJava 2.x - 一个 Observable 获取由 PublishSubject 触发并与其他 Observables 合并的 flatMap 没有得到 subscribed/executed
RxJava 2.x - An Observable getting flatMap triggered by a PublishSubject and merged with other Observables does not get subscribed/executed
我有一个使用 PublishSubject
的分页解决方案,如下所示:
private val pages: PublishSubject<Int> = PublishSubject.create()
val observable: Observable<List<Data> = pages.hide()
.filter { !inFlight }
.doOnNext { inFlight = true }
.flatMap{
getPage(it) // Returns an Observable
}
.doOnNext(::onNextPage) // inFlight gets reset here
此 Observable
与其他 Observable
合并并扫描,如下所示:
fun stateObservable(): Observable<SavedState> {
return Observable.merge(listOf(firstPage(),
nextPage(),// The observable listed above
refresh()))
.scan(MyState.initialState(), StateReducer::reduce)
}
基本上我有一个单向设置,其中每个可观察到的更新 MyState
借助累加器函数 reduce
的相关更改。
在 ViewModel
中,这是直接消耗的:
interactor.stateObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = ::render, onError = Timber::e)
.addTo(subscriptions)
此设置适用于 firstPage
以及 refresh
(也在 PublishSubject
的帮助下触发)但由于某种原因,分页解决方案返回 getPage
Observable
在 flatMap
但此页面 Observable
永远不会 triggered/subscribed 和 flatMap
之后的 doOnNext
显然没有'也不会被叫到。好像基本上不想订阅了,我也不知道为什么。
getPage
函数如下所示:
private fun getPage(page: Long): Observable<PartialState<SavedState>> {
return repo.getPage(page).firstOrError().toObservable()
.subscribeOn(Schedulers.io())
.map<PartialState<MyState>> { NextPageLoaded(it) }
.onErrorReturn { NextPageError(it) }
.startWith { NextPageLoading() }
}
repo 中的 getPage
在 RxJavaInterop
的帮助下,通过以下方式将 RxJava 1 Observable
转换为 RxJava2 Observable
:
public io.reactivex.Observable<List<Data>> getPage(long page) {
Observable<List<Data>> observable = getPage(page)
.map(dataList -> {
if(dataList == null){
dataList = new ArrayList<>();
}
return dataList;
});
return RxJavaInterop.toV2Observable(observable);
}
我没有收到任何错误,因此您可以排除这种情况。
我已经有了与 RxJava 1 完全相同的设置,它工作得很好,现在当我迁移到 2.x 时,我期待相同的解决方案可以工作,但我完全坚持这个分页问题以及在所有其他情况下设置都按预期工作。
为了能够测试这个问题,我在 GitHub 上上传了一个示例项目来演示这个问题。
有哪位 RxJava 专家知道它可能是什么? :)
谢谢
我发现了一个问题:过度使用 {}
为 startWith
创建的 lambda 在 nextPageObservable
中什么都不做(因此永远不会切换到页面链) .
.startWith { NextPageLoading() }
改为:
.startWith ( NextPageLoading() )
但是,通过此更改,您的代码会由于其他地方的空值而产生 IAE:
java.lang.IllegalArgumentException: Parameter specified as non-null is null: method kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull, parameter page
at com.paginationissue.ui.list.PaginationIssueInteractor.invoke(Unknown Source:2)
at com.paginationissue.ui.list.PaginationIssueInteractor.invoke(PaginationIssueInteractor.kt:15)
at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:21)
at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:12)
at com.paginationissue.paging.Pager.onNextPage(Pager.kt:92)
我有一个使用 PublishSubject
的分页解决方案,如下所示:
private val pages: PublishSubject<Int> = PublishSubject.create()
val observable: Observable<List<Data> = pages.hide()
.filter { !inFlight }
.doOnNext { inFlight = true }
.flatMap{
getPage(it) // Returns an Observable
}
.doOnNext(::onNextPage) // inFlight gets reset here
此 Observable
与其他 Observable
合并并扫描,如下所示:
fun stateObservable(): Observable<SavedState> {
return Observable.merge(listOf(firstPage(),
nextPage(),// The observable listed above
refresh()))
.scan(MyState.initialState(), StateReducer::reduce)
}
基本上我有一个单向设置,其中每个可观察到的更新 MyState
借助累加器函数 reduce
的相关更改。
在 ViewModel
中,这是直接消耗的:
interactor.stateObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = ::render, onError = Timber::e)
.addTo(subscriptions)
此设置适用于 firstPage
以及 refresh
(也在 PublishSubject
的帮助下触发)但由于某种原因,分页解决方案返回 getPage
Observable
在 flatMap
但此页面 Observable
永远不会 triggered/subscribed 和 flatMap
之后的 doOnNext
显然没有'也不会被叫到。好像基本上不想订阅了,我也不知道为什么。
getPage
函数如下所示:
private fun getPage(page: Long): Observable<PartialState<SavedState>> {
return repo.getPage(page).firstOrError().toObservable()
.subscribeOn(Schedulers.io())
.map<PartialState<MyState>> { NextPageLoaded(it) }
.onErrorReturn { NextPageError(it) }
.startWith { NextPageLoading() }
}
repo 中的 getPage
在 RxJavaInterop
的帮助下,通过以下方式将 RxJava 1 Observable
转换为 RxJava2 Observable
:
public io.reactivex.Observable<List<Data>> getPage(long page) {
Observable<List<Data>> observable = getPage(page)
.map(dataList -> {
if(dataList == null){
dataList = new ArrayList<>();
}
return dataList;
});
return RxJavaInterop.toV2Observable(observable);
}
我没有收到任何错误,因此您可以排除这种情况。
我已经有了与 RxJava 1 完全相同的设置,它工作得很好,现在当我迁移到 2.x 时,我期待相同的解决方案可以工作,但我完全坚持这个分页问题以及在所有其他情况下设置都按预期工作。
为了能够测试这个问题,我在 GitHub 上上传了一个示例项目来演示这个问题。
有哪位 RxJava 专家知道它可能是什么? :)
谢谢
我发现了一个问题:过度使用 {}
为 startWith
创建的 lambda 在 nextPageObservable
中什么都不做(因此永远不会切换到页面链) .
.startWith { NextPageLoading() }
改为:
.startWith ( NextPageLoading() )
但是,通过此更改,您的代码会由于其他地方的空值而产生 IAE:
java.lang.IllegalArgumentException: Parameter specified as non-null is null: method kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull, parameter page
at com.paginationissue.ui.list.PaginationIssueInteractor.invoke(Unknown Source:2)
at com.paginationissue.ui.list.PaginationIssueInteractor.invoke(PaginationIssueInteractor.kt:15)
at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:21)
at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:12)
at com.paginationissue.paging.Pager.onNextPage(Pager.kt:92)