制作一个只执行一次源代码的惰性缓存可观察对象
Making a lazy, cached observable that only execute the source once
我正在尝试使用 rxjs observable 来委托,但在应用程序的整个生命周期中共享一项昂贵的工作。
本质上,类似于:
var work$ = Observable.create((o) => {
const expensive = doSomethingExpensive();
o.next(expensive);
observer.complete();
})
.publishReplay(1)
.refCount();
现在,这工作正常并且完全符合我的要求,除了一件事:如果所有订阅者都取消订阅,那么当下一个订阅者订阅时,我的昂贵工作又会发生。我想保留它。
现在,我可以使用一个主题,或者我可以删除 refCount() 并手动使用连接(永远不会断开连接)。但这会使昂贵的工作在我连接时发生,而不是在订阅者第一次尝试使用工作时发生 $。
本质上,我想要类似于 refCount 的东西,它只查看第一个订阅连接,并且永远不会断开连接。 "lazy connect".
这种事情有可能吗?
publishReplay()
是如何工作的
它在内部创建了一个 ReplaySubject
并使其 multicast
兼容。 ReplaySubject
的最小重播值是 1 次发射。结果如下:
- 第一次订阅将触发
publishReplay(1)
在内部订阅源流并通过 ReplaySubject
传输所有排放,有效地缓存最后一个 n( =1) 排放
- 如果在源仍处于活动状态时开始第二个订阅,
multicast()
会将我们连接到同一个 replaySubject
,我们将收到所有下一次排放,直到源流完成。
- 如果订阅在源已经完成后开始,replaySubject 缓存了最后的 n 次发射,它只会在完成之前接收那些。
const source = Rx.Observable.from([1,2])
.mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
.do(null,null,() => console.log('source stream completed'))
.publishReplay(1)
.refCount();
// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));
// new subscription after the stream has completed already
setTimeout(() => {
source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
我正在尝试使用 rxjs observable 来委托,但在应用程序的整个生命周期中共享一项昂贵的工作。
本质上,类似于:
var work$ = Observable.create((o) => {
const expensive = doSomethingExpensive();
o.next(expensive);
observer.complete();
})
.publishReplay(1)
.refCount();
现在,这工作正常并且完全符合我的要求,除了一件事:如果所有订阅者都取消订阅,那么当下一个订阅者订阅时,我的昂贵工作又会发生。我想保留它。
现在,我可以使用一个主题,或者我可以删除 refCount() 并手动使用连接(永远不会断开连接)。但这会使昂贵的工作在我连接时发生,而不是在订阅者第一次尝试使用工作时发生 $。
本质上,我想要类似于 refCount 的东西,它只查看第一个订阅连接,并且永远不会断开连接。 "lazy connect".
这种事情有可能吗?
publishReplay()
是如何工作的
它在内部创建了一个 ReplaySubject
并使其 multicast
兼容。 ReplaySubject
的最小重播值是 1 次发射。结果如下:
- 第一次订阅将触发
publishReplay(1)
在内部订阅源流并通过ReplaySubject
传输所有排放,有效地缓存最后一个 n( =1) 排放 - 如果在源仍处于活动状态时开始第二个订阅,
multicast()
会将我们连接到同一个replaySubject
,我们将收到所有下一次排放,直到源流完成。 - 如果订阅在源已经完成后开始,replaySubject 缓存了最后的 n 次发射,它只会在完成之前接收那些。
const source = Rx.Observable.from([1,2])
.mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
.do(null,null,() => console.log('source stream completed'))
.publishReplay(1)
.refCount();
// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));
// new subscription after the stream has completed already
setTimeout(() => {
source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>