RxJava:是否有 Observable.cachedDefer() 运算符或其他等效运算符?
RxJava: Is there an Observable.cachedDefer() operator, or some equivalent?
是否有一种安全的方法来推迟 Observable<T>
的创建,但在创建时将其缓存一段时间,然后它必须再次创建它?
Observable.cachedDefer(() -> createExpensiveFiniteObservable().cache(), 5, TimeUnit.MINUTES);
我的数据集构建起来很昂贵,我希望它们缓存足够长的时间以支持一个进程,但让它们大约在每个 运行 之间过期。
我已将执行此操作的功能添加到 rxjava-extras。下面的代码取决于 rxjava-extras 0.6.8-RC2 或更高版本(在 Maven Central 上)。
运行 下面的 main
方法,您将看到这些结果似乎与您想要的行为相匹配(只要与下一次订阅的时间间隔 >=5 秒,就会重置缓存):
source emits Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
source emits Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
source emits Tue Oct 13 21:00:46 AEDT 2015
Tue Oct 13 21:00:46 AEDT 2015
Tue Oct 13 21:00:46 AEDT 2015
...
主要方法:
public static void main(String[] args) throws InterruptedException {
Observable<Date> source = Observable
.defer(() ->
Observable
.just(new Date())
.doOnNext(
d -> System.out.println("source emits " + d)));
CloseableObservableWithReset<Date> cached =
Obs.cache(source, 5, TimeUnit.SECONDS, Schedulers.computation());
Observable<Date> o = cached
.observable()
.doOnSubscribe(() -> cached.reset());
for (int i = 0; i < 30; i++) {
o.doOnNext(System.out::println).subscribe();
Thread.sleep((i % 5 + 1)*1000);
}
cached.close();
}
请注意,要在每次发射时重置缓存,然后在您看到的地方重置缓存
.doOnSubscribe(() -> cached.reset())
放
.doOnSubscribe(() -> cached.reset())
.doOnNext(x -> cached.reset())
是否有一种安全的方法来推迟 Observable<T>
的创建,但在创建时将其缓存一段时间,然后它必须再次创建它?
Observable.cachedDefer(() -> createExpensiveFiniteObservable().cache(), 5, TimeUnit.MINUTES);
我的数据集构建起来很昂贵,我希望它们缓存足够长的时间以支持一个进程,但让它们大约在每个 运行 之间过期。
我已将执行此操作的功能添加到 rxjava-extras。下面的代码取决于 rxjava-extras 0.6.8-RC2 或更高版本(在 Maven Central 上)。
运行 下面的 main
方法,您将看到这些结果似乎与您想要的行为相匹配(只要与下一次订阅的时间间隔 >=5 秒,就会重置缓存):
source emits Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
source emits Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
source emits Tue Oct 13 21:00:46 AEDT 2015
Tue Oct 13 21:00:46 AEDT 2015
Tue Oct 13 21:00:46 AEDT 2015
...
主要方法:
public static void main(String[] args) throws InterruptedException {
Observable<Date> source = Observable
.defer(() ->
Observable
.just(new Date())
.doOnNext(
d -> System.out.println("source emits " + d)));
CloseableObservableWithReset<Date> cached =
Obs.cache(source, 5, TimeUnit.SECONDS, Schedulers.computation());
Observable<Date> o = cached
.observable()
.doOnSubscribe(() -> cached.reset());
for (int i = 0; i < 30; i++) {
o.doOnNext(System.out::println).subscribe();
Thread.sleep((i % 5 + 1)*1000);
}
cached.close();
}
请注意,要在每次发射时重置缓存,然后在您看到的地方重置缓存
.doOnSubscribe(() -> cached.reset())
放
.doOnSubscribe(() -> cached.reset())
.doOnNext(x -> cached.reset())