RxJava:是否有 Observable.cachedDefer() 运算符或其他等效运算符?

RxJava: Is there an Observable.cachedDefer() operator, or some equivalent?

是否有一种安全的方法来推迟 Observable<T> 的创建,但在创建时将其缓存一段时间,然后它必须再次创建它?

Observable.cachedDefer(() -> createExpensiveFiniteObservable().cache(), 5, TimeUnit.MINUTES); 

我的数据集构建起来很昂贵,我希望它们缓存足够长的时间以支持一个进程,但让它们大约在每个 运行 之间过期。

我已将执行此操作的功能添加到 rxjava-extras。下面的代码取决于 rxjava-extras 0.6.8-RC2 或更高版本(在 Maven Central 上)。

运行 下面的 main 方法,您将看到这些结果似乎与您想要的行为相匹配(只要与下一次订阅的时间间隔 >=5 秒,就会重置缓存):

source emits Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
Tue Oct 13 21:00:16 AEDT 2015
source emits Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
Tue Oct 13 21:00:31 AEDT 2015
source emits Tue Oct 13 21:00:46 AEDT 2015
Tue Oct 13 21:00:46 AEDT 2015
Tue Oct 13 21:00:46 AEDT 2015
...

主要方法:

public static void main(String[] args) throws InterruptedException {
    Observable<Date> source = Observable
      .defer(() -> 
         Observable
           .just(new Date())
           .doOnNext(
              d -> System.out.println("source emits " + d)));

    CloseableObservableWithReset<Date> cached = 
        Obs.cache(source, 5, TimeUnit.SECONDS, Schedulers.computation());
    Observable<Date> o = cached
        .observable()
        .doOnSubscribe(() -> cached.reset());
    for (int i = 0; i < 30; i++) {
        o.doOnNext(System.out::println).subscribe();
        Thread.sleep((i % 5 + 1)*1000);
    }
    cached.close();
} 

请注意,要在每次发射时重置缓存,然后在您看到的地方重置缓存

.doOnSubscribe(() -> cached.reset())

.doOnSubscribe(() -> cached.reset())
.doOnNext(x -> cached.reset())