当第一个 Observable 为空时 RxJava concat
RxJava concat when first Observable is empty
我有一个本地存储库和一个从中获取数据的远程存储库。我第一次想从远程存储库中获取数据,然后将其缓存并从本地数据库中获取。
我正在使用 2 个 Observables 和 concat 方法来执行此操作:
Observable.concat(localWeatherForecast, remoteWeatherForecast)
.filter(weatherForecasts -> !weatherForecasts.isEmpty())
.first();
对于本地可观察对象,我使用这个:
private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
return mLocalDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
return Observable.from(weatherEntries)
.doOnNext(entry -> mCachedWeather.add(entry))
.toList();
}
});
}
对于远程:
return mRemoteDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
return Observable.from(weather).doOnNext(entry -> {
mLocalDataSource.saveWeatherEntry(entry);
mCachedWeather.add(entry);
}).toList();
}
})
.doOnCompleted(() -> mCacheIsDirty = false);
这是订阅
Subscription subscription = mRepository
.getWeather()
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.subscribe(
// onNext
this::processWeather,
// onError
throwable -> mWeatherView.showLoadingTasksError(),
// onCompleted
() -> mWeatherView.setLoadingIndicator(false));
即使本地存储库为空,concat 似乎也不会移动到第二个可观察对象(远程对象)。如果我颠倒顺序,那么它就可以工作(远程成为 concat 中的第一个)。
我试图删除过滤器方法/first(),但即使如此,远程可观察对象也没有被处理。
知道为什么吗?谢谢!
concat()
将等到第一个 Observable
发出所有内容,然后它将监听第二个 Observable
的发出。如果第一个 Observable
没有发出任何东西,即它是空的,那么第二个 Observable
将不会被考虑。
您可以使用 junk 项目开始您的第一个流,然后跳过流的第一个发射:
Observable.concat(
Observable.<Integer>empty().startWith(-1),
Observable.fromArray(1, 2, 3))
.skip(1)
.test()
.assertResult(1, 2, 3);
现在,第一个 Observable
将发出 -1
并且之后什么也没有,这对于 concat()
运算符正常运行是可以的。稍后您将跳过您不感兴趣的第一个发射。
您可以为第一个可观察对象提供超时。如果第一个 observable 永远不会完成,它就不会切换到第二个。请看看我提供的测试方法:
测试方法 'never_false' 不会产生任何值并在 1000 毫秒后超时,因为没有值被推送。
对于方法 'never_true',concat 中的第一个 observable 将在一段时间后超时并切换到 onComplete observable。因此 concat 将切换到第二个可观察对象并从该流中获取第一个元素。
@Test
public void never_false() throws Exception {
Observable<List<Integer>> never = Observable.never();
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
Observable<List<Integer>> concat = Observable.concat(never, just);
boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);
assertThat(await).isTrue();
}
@Test
public void never_true() throws Exception {
Observable<List<Integer>> never = Observable.<List<Integer>>never()
.timeout(50, TimeUnit.MILLISECONDS)
.onErrorResumeNext(Observable.empty());
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
TestObserver<List<Integer>> test = Observable.concat(never, just)
.test()
.await()
.assertComplete()
.assertNoErrors();
List<Integer> collect = test.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertThat(collect).contains(1, 2, 3);
}
我有一个本地存储库和一个从中获取数据的远程存储库。我第一次想从远程存储库中获取数据,然后将其缓存并从本地数据库中获取。 我正在使用 2 个 Observables 和 concat 方法来执行此操作:
Observable.concat(localWeatherForecast, remoteWeatherForecast)
.filter(weatherForecasts -> !weatherForecasts.isEmpty())
.first();
对于本地可观察对象,我使用这个:
private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
return mLocalDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
return Observable.from(weatherEntries)
.doOnNext(entry -> mCachedWeather.add(entry))
.toList();
}
});
}
对于远程:
return mRemoteDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
return Observable.from(weather).doOnNext(entry -> {
mLocalDataSource.saveWeatherEntry(entry);
mCachedWeather.add(entry);
}).toList();
}
})
.doOnCompleted(() -> mCacheIsDirty = false);
这是订阅
Subscription subscription = mRepository
.getWeather()
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.subscribe(
// onNext
this::processWeather,
// onError
throwable -> mWeatherView.showLoadingTasksError(),
// onCompleted
() -> mWeatherView.setLoadingIndicator(false));
即使本地存储库为空,concat 似乎也不会移动到第二个可观察对象(远程对象)。如果我颠倒顺序,那么它就可以工作(远程成为 concat 中的第一个)。 我试图删除过滤器方法/first(),但即使如此,远程可观察对象也没有被处理。
知道为什么吗?谢谢!
concat()
将等到第一个 Observable
发出所有内容,然后它将监听第二个 Observable
的发出。如果第一个 Observable
没有发出任何东西,即它是空的,那么第二个 Observable
将不会被考虑。
您可以使用 junk 项目开始您的第一个流,然后跳过流的第一个发射:
Observable.concat(
Observable.<Integer>empty().startWith(-1),
Observable.fromArray(1, 2, 3))
.skip(1)
.test()
.assertResult(1, 2, 3);
现在,第一个 Observable
将发出 -1
并且之后什么也没有,这对于 concat()
运算符正常运行是可以的。稍后您将跳过您不感兴趣的第一个发射。
您可以为第一个可观察对象提供超时。如果第一个 observable 永远不会完成,它就不会切换到第二个。请看看我提供的测试方法:
测试方法 'never_false' 不会产生任何值并在 1000 毫秒后超时,因为没有值被推送。
对于方法 'never_true',concat 中的第一个 observable 将在一段时间后超时并切换到 onComplete observable。因此 concat 将切换到第二个可观察对象并从该流中获取第一个元素。
@Test
public void never_false() throws Exception {
Observable<List<Integer>> never = Observable.never();
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
Observable<List<Integer>> concat = Observable.concat(never, just);
boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);
assertThat(await).isTrue();
}
@Test
public void never_true() throws Exception {
Observable<List<Integer>> never = Observable.<List<Integer>>never()
.timeout(50, TimeUnit.MILLISECONDS)
.onErrorResumeNext(Observable.empty());
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
TestObserver<List<Integer>> test = Observable.concat(never, just)
.test()
.await()
.assertComplete()
.assertNoErrors();
List<Integer> collect = test.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertThat(collect).contains(1, 2, 3);
}