RxJava 在完成后结合多个可观察对象做某事

RxJava combine multiple observables do sth after all done

我有一个方法可以使用多个可观察值重新加载数据库。毕竟它应该调用方法 setupPlace(selecetedPlace),但只能调用一次。每个 observable returns 个不同的对象(void、object、listOfObjects)。

private void reloadDatabaseFromRest(final Place selectedPlace, final Event selectedEvent) throws ParseException {
    Observable.concat(DatabaseManager.getInstance().clearDatabase(),
            mPlaceUseCase.getPlacesListFromRestObservable()
                    .filter(places -> places != null && !places.isEmpty())
                    .doOnNext(places -> mPlaceUseCase.savePlacesToRepository(places)),
            mPlaceUseCase.saveSelectedPlace(selectedPlace),
            mEventUseCase.getEvenListObservableFromRest(selectedPlace.getId())
                    .doOnNext(eventList -> mEventUseCase.saveEventsToRepository(eventList)),
            mEventUseCase.saveSelectedEventObservable(selectedEvent))
            .takeLast(1)
            .subscribe(o -> mSplashScreenUI.setupPlace(selectedPlace));
}

但是此方法仅在最后一个可观察对象上执行 subsribe() 并且不会调用其他方法(我说得对吗?)。我尝试使用过滤器、zip 并制作愚蠢的 if(o instanceOf ArrayList) 子句,但这是错误的方法。有人可以帮帮我吗?

您可以使用 Completable 类型。 Completable 只是通知订阅者流已完成。

使用 Completable.fromObservable() 将您需要订阅的每个可观察对象包装到一个 Completable 中,然后将它们与 Completable.merge() 合并(并行开始)或使用 [=13 连接它们(一个接一个地串联) =].

注意: 确保包装到 Completable 中的所有 observable 都将调用 onComplete,因此不是无穷无尽的流。

非常感谢@koperko!它就像一个魅力,这正是我所需要的。 在重写方法期间,我注意到第一个 Observable 发出 6 个项目(clearDatabase()),所以我将它包装到 Completable 中并且它起作用了! 我没有使用 concat(),而是使用了 flatMap():

private void reloadDatabaseFromRest(final Place selectedPlace, final Event selectedEvent) {
    Completable.fromObservable(
            DatabaseManager.getInstance().clearDatabase()
                    .flatMap(aVoidDB -> mPlaceUseCase.getPlacesListFromRestObservable().filter(places -> places != null && !places.isEmpty()))
                    .flatMap(places -> mPlaceUseCase.savePlacesToRepository(places))
                    .flatMap(aVoidPlace -> mPlaceUseCase.saveSelectedPlace(selectedPlace))
                    .flatMap(aVoidSelectedPlace -> {
                        try {
                            return mEventUseCase.getEvenListObservableFromRest(selectedPlace.getId());
                        } catch (ParseException e) {
                            Log.e(TAG, e.getMessage());
                            return null;
                        }
                    })
                    .filter(eventList -> eventList != null)
                    .flatMap(eventList -> mEventUseCase.saveEventsToRepository(eventList))
                    .flatMap(aVoidEvents -> mEventUseCase.saveSelectedEventObservable(selectedEvent)))
            .subscribe(() -> {
                mSplashScreenUI.setupPlace(selectedPlace);
            });
}