当第一个 Observable 为空时 RxJava concat

RxJava concat when first Observable is empty

我有一个本地存储库和一个从中获取数据的远程存储库。我第一次想从远程存储库中获取数据,然后将其缓存并从本地数据库中获取。 我正在使用 2 个 Observables 和 concat 方法来执行此操作:

Observable.concat(localWeatherForecast, remoteWeatherForecast)
                .filter(weatherForecasts -> !weatherForecasts.isEmpty())
                .first();

对于本地可观察对象,我使用这个:

private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
    return mLocalDataSource.getWeather()
            .flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
                @Override
                public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
                    return Observable.from(weatherEntries)
                            .doOnNext(entry -> mCachedWeather.add(entry))
                            .toList();
                }
            });
}

对于远程:

return mRemoteDataSource.getWeather()
            .flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
                @Override
                public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
                    return Observable.from(weather).doOnNext(entry -> {
                        mLocalDataSource.saveWeatherEntry(entry);
                        mCachedWeather.add(entry);
                    }).toList();
                }
            })
            .doOnCompleted(() -> mCacheIsDirty = false);

这是订阅

Subscription subscription = mRepository
            .getWeather()
            .subscribeOn(mSchedulerProvider.computation())
            .observeOn(mSchedulerProvider.ui())
            .subscribe(
                    // onNext
                    this::processWeather,
                    // onError
                    throwable -> mWeatherView.showLoadingTasksError(),
                    // onCompleted
                    () -> mWeatherView.setLoadingIndicator(false));

即使本地存储库为空,concat 似乎也不会移动到第二个可观察对象(远程对象)。如果我颠倒顺序,那么它就可以工作(远程成为 concat 中的第一个)。 我试图删除过滤器方法/first(),但即使如此,远程可观察对象也没有被处理。

知道为什么吗?谢谢!

concat() 将等到第一个 Observable 发出所有内容,然后它将监听第二个 Observable 的发出。如果第一个 Observable 没有发出任何东西,即它是空的,那么第二个 Observable 将不会被考虑。

您可以使用 junk 项目开始您的第一个流,然后跳过流的第一个发射:

Observable.concat(
            Observable.<Integer>empty().startWith(-1),
            Observable.fromArray(1, 2, 3))
        .skip(1)
        .test()
        .assertResult(1, 2, 3);

现在,第一个 Observable 将发出 -1 并且之后什么也没有,这对于 concat() 运算符正常运行是可以的。稍后您将跳过您不感兴趣的第一个发射。

您可以为第一个可观察对象提供超时。如果第一个 observable 永远不会完成,它就不会切换到第二个。请看看我提供的测试方法:

测试方法 'never_false' 不会产生任何值并在 1000 毫秒后超时,因为没有值被推送。

对于方法 'never_true',concat 中的第一个 observable 将在一段时间后超时并切换到 onComplete observable。因此 concat 将切换到第二个可观察对象并从该流中获取第一个元素。

@Test
public void never_false() throws Exception {
    Observable<List<Integer>> never = Observable.never();

    Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
    Observable<List<Integer>> concat = Observable.concat(never, just);

    boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);

    assertThat(await).isTrue();
}

@Test
public void never_true() throws Exception {
    Observable<List<Integer>> never = Observable.<List<Integer>>never()
            .timeout(50, TimeUnit.MILLISECONDS)
            .onErrorResumeNext(Observable.empty());

    Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));

    TestObserver<List<Integer>> test = Observable.concat(never, just)
            .test()
            .await()
            .assertComplete()
            .assertNoErrors();

    List<Integer> collect = test.values().stream()
            .flatMap(Collection::stream)
            .collect(Collectors.toList());

    assertThat(collect).contains(1, 2, 3);
}