制作一个只执行一次源代码的惰性缓存可观察对象

Making a lazy, cached observable that only execute the source once

我正在尝试使用 rxjs observable 来委托,但在应用程序的整个生命周期中共享一项昂贵的工作。

本质上,类似于:

var work$ = Observable.create((o) => {
  const expensive = doSomethingExpensive();
  o.next(expensive);
  observer.complete();
})
.publishReplay(1)
.refCount();

现在,这工作正常并且完全符合我的要求,除了一件事:如果所有订阅者都取消订阅,那么当下一个订阅者订阅时,我的昂贵工作又会发生。我想保留它。

现在,我可以使用一个主题,或者我可以删除 refCount() 并手动使用连接(永远不会断开连接)。但这会使昂贵的工作在我连接时发生,而不是在订阅者第一次尝试使用工作时发生 $。

本质上,我想要类似于 refCount 的东西,它只查看第一个订阅连接,并且永远不会断开连接。 "lazy connect".

这种事情有可能吗?

publishReplay() 是如何工作的

它在内部创建了一个 ReplaySubject 并使其 multicast 兼容。 ReplaySubject 的最小重播值是 1 次发射。结果如下:

  • 第一次订阅将触发 publishReplay(1) 在内部订阅源流并通过 ReplaySubject 传输所有排放,有效地缓存最后一个 n( =1) 排放
  • 如果在源仍处于活动状态时开始第二个订阅,multicast() 会将我们连接到同一个 replaySubject,我们将收到所有下一次排放,直到源流完成。
  • 如果订阅在源已经完成后开始,replaySubject 缓存了最后的 n 次发射,它只会在完成之前接收那些。

const source = Rx.Observable.from([1,2])
  .mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
  .do(null,null,() => console.log('source stream completed'))
  .publishReplay(1)
  .refCount();

// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));

// new subscription after the stream has completed already
setTimeout(() => {
  source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>