RxJava 在完成后结合多个可观察对象做某事
RxJava combine multiple observables do sth after all done
我有一个方法可以使用多个可观察值重新加载数据库。毕竟它应该调用方法 setupPlace(selecetedPlace),但只能调用一次。每个 observable returns 个不同的对象(void、object、listOfObjects)。
private void reloadDatabaseFromRest(final Place selectedPlace, final Event selectedEvent) throws ParseException {
Observable.concat(DatabaseManager.getInstance().clearDatabase(),
mPlaceUseCase.getPlacesListFromRestObservable()
.filter(places -> places != null && !places.isEmpty())
.doOnNext(places -> mPlaceUseCase.savePlacesToRepository(places)),
mPlaceUseCase.saveSelectedPlace(selectedPlace),
mEventUseCase.getEvenListObservableFromRest(selectedPlace.getId())
.doOnNext(eventList -> mEventUseCase.saveEventsToRepository(eventList)),
mEventUseCase.saveSelectedEventObservable(selectedEvent))
.takeLast(1)
.subscribe(o -> mSplashScreenUI.setupPlace(selectedPlace));
}
但是此方法仅在最后一个可观察对象上执行 subsribe() 并且不会调用其他方法(我说得对吗?)。我尝试使用过滤器、zip 并制作愚蠢的 if(o instanceOf ArrayList) 子句,但这是错误的方法。有人可以帮帮我吗?
您可以使用 Completable
类型。 Completable 只是通知订阅者流已完成。
使用 Completable.fromObservable()
将您需要订阅的每个可观察对象包装到一个 Completable 中,然后将它们与 Completable.merge()
合并(并行开始)或使用 [=13 连接它们(一个接一个地串联) =].
注意: 确保包装到 Completable 中的所有 observable 都将调用 onComplete,因此不是无穷无尽的流。
非常感谢@koperko!它就像一个魅力,这正是我所需要的。
在重写方法期间,我注意到第一个 Observable 发出 6 个项目(clearDatabase()),所以我将它包装到 Completable 中并且它起作用了!
我没有使用 concat(),而是使用了 flatMap():
private void reloadDatabaseFromRest(final Place selectedPlace, final Event selectedEvent) {
Completable.fromObservable(
DatabaseManager.getInstance().clearDatabase()
.flatMap(aVoidDB -> mPlaceUseCase.getPlacesListFromRestObservable().filter(places -> places != null && !places.isEmpty()))
.flatMap(places -> mPlaceUseCase.savePlacesToRepository(places))
.flatMap(aVoidPlace -> mPlaceUseCase.saveSelectedPlace(selectedPlace))
.flatMap(aVoidSelectedPlace -> {
try {
return mEventUseCase.getEvenListObservableFromRest(selectedPlace.getId());
} catch (ParseException e) {
Log.e(TAG, e.getMessage());
return null;
}
})
.filter(eventList -> eventList != null)
.flatMap(eventList -> mEventUseCase.saveEventsToRepository(eventList))
.flatMap(aVoidEvents -> mEventUseCase.saveSelectedEventObservable(selectedEvent)))
.subscribe(() -> {
mSplashScreenUI.setupPlace(selectedPlace);
});
}
我有一个方法可以使用多个可观察值重新加载数据库。毕竟它应该调用方法 setupPlace(selecetedPlace),但只能调用一次。每个 observable returns 个不同的对象(void、object、listOfObjects)。
private void reloadDatabaseFromRest(final Place selectedPlace, final Event selectedEvent) throws ParseException {
Observable.concat(DatabaseManager.getInstance().clearDatabase(),
mPlaceUseCase.getPlacesListFromRestObservable()
.filter(places -> places != null && !places.isEmpty())
.doOnNext(places -> mPlaceUseCase.savePlacesToRepository(places)),
mPlaceUseCase.saveSelectedPlace(selectedPlace),
mEventUseCase.getEvenListObservableFromRest(selectedPlace.getId())
.doOnNext(eventList -> mEventUseCase.saveEventsToRepository(eventList)),
mEventUseCase.saveSelectedEventObservable(selectedEvent))
.takeLast(1)
.subscribe(o -> mSplashScreenUI.setupPlace(selectedPlace));
}
但是此方法仅在最后一个可观察对象上执行 subsribe() 并且不会调用其他方法(我说得对吗?)。我尝试使用过滤器、zip 并制作愚蠢的 if(o instanceOf ArrayList) 子句,但这是错误的方法。有人可以帮帮我吗?
您可以使用 Completable
类型。 Completable 只是通知订阅者流已完成。
使用 Completable.fromObservable()
将您需要订阅的每个可观察对象包装到一个 Completable 中,然后将它们与 Completable.merge()
合并(并行开始)或使用 [=13 连接它们(一个接一个地串联) =].
注意: 确保包装到 Completable 中的所有 observable 都将调用 onComplete,因此不是无穷无尽的流。
非常感谢@koperko!它就像一个魅力,这正是我所需要的。 在重写方法期间,我注意到第一个 Observable 发出 6 个项目(clearDatabase()),所以我将它包装到 Completable 中并且它起作用了! 我没有使用 concat(),而是使用了 flatMap():
private void reloadDatabaseFromRest(final Place selectedPlace, final Event selectedEvent) {
Completable.fromObservable(
DatabaseManager.getInstance().clearDatabase()
.flatMap(aVoidDB -> mPlaceUseCase.getPlacesListFromRestObservable().filter(places -> places != null && !places.isEmpty()))
.flatMap(places -> mPlaceUseCase.savePlacesToRepository(places))
.flatMap(aVoidPlace -> mPlaceUseCase.saveSelectedPlace(selectedPlace))
.flatMap(aVoidSelectedPlace -> {
try {
return mEventUseCase.getEvenListObservableFromRest(selectedPlace.getId());
} catch (ParseException e) {
Log.e(TAG, e.getMessage());
return null;
}
})
.filter(eventList -> eventList != null)
.flatMap(eventList -> mEventUseCase.saveEventsToRepository(eventList))
.flatMap(aVoidEvents -> mEventUseCase.saveSelectedEventObservable(selectedEvent)))
.subscribe(() -> {
mSplashScreenUI.setupPlace(selectedPlace);
});
}