rxjs 5 publishReplay refCount
rxjs 5 publishReplay refCount
我不知道 publishReplay().refCount()
是如何工作的。
例如(https://jsfiddle.net/7o3a45L1/):
var source = Rx.Observable.create(observer => {
console.log("call");
// expensive http request
observer.next(5);
}).publishReplay().refCount();
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
console.log("");
subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)});
subscription2.unsubscribe();
console.log("");
subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)});
subscription3.unsubscribe();
console.log("");
subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)});
subscription4.unsubscribe();
给出以下结果:
call observerA: 5
observerB: 5 call observerB: 5
observerC: 5 observerC: 5 call observerC: 5
observerD: 5 observerD: 5 observerD: 5 call observerD: 5
1) 为什么会多次调用observerB、C、D?
2) 为什么每行打印 "call" 而不是行首?
此外,如果我调用 publishReplay(1).refCount()
,它会分别调用 observerB、C 和 D 两次。
我期望的是每个新观察者只收到一次值 5 并且 "call" 只打印一次。
发生这种情况是因为您正在使用 publishReplay()
。它在内部创建了一个 ReplaySubject
的实例,用于存储所有经过的值。
由于您使用 Observable.create
发出单个值,因此每次调用 source.subscribe(...)
时都会将一个值附加到 ReplaySubject
.
中的缓冲区
你没有在每一行的开头打印 call
因为它是 ReplaySubject
在你订阅时首先发出它的缓冲区然后它自己订阅它的来源:
有关实施详情,请参阅:
https://github.com/ReactiveX/rxjs/blob/master/src/operator/multicast.ts#L63
https://github.com/ReactiveX/rxjs/blob/master/src/ReplaySubject.ts#L54
使用publishReplay(1)
时也是如此。首先,它从 ReplaySubject
发出缓冲项,然后从 observer.next(5);
发出另一个项
通常:refCount
意味着,只要至少有 1 个订阅者,流就是 hot/shared - 但是,当没有订阅者时,它就是 reset/cold。
这意味着如果你想绝对确定没有任何事情被执行超过一次,你不应该使用 refCount()
而只是 connect
流来设置热。
作为附加说明:如果您在 observer.next(5);
之后添加 observer.complete()
,您也会得到预期的结果。
旁注:您真的需要在这里创建自己的自定义 Obervable
吗?在 95% 的情况下,现有运算符足以满足给定的用例。
publishReplay(x).refCount()
合并执行以下操作:
- 它创建一个
ReplaySubject
重播最多 x 次发射。如果未定义 x 则它会重播完整的流。
- 它使用 refCount() 运算符使此
ReplaySubject
多播兼容。这导致 并发 订阅接收相同的排放量。
您的示例包含一些问题,影响了它们如何协同工作。请参阅以下修改后的片段:
var state = 5
var realSource = Rx.Observable.create(observer => {
console.log("creating expensive HTTP-based emission");
observer.next(state++);
// observer.complete();
return () => {
console.log('unsubscribing from source')
}
});
var source = Rx.Observable.of('')
.do(() => console.log('stream subscribed'))
.ignoreElements()
.concat(realSource)
.do(null, null, () => console.log('stream completed'))
.publishReplay()
.refCount()
;
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
subscription2 = source.subscribe(v => console.log('observerB: ' + v));
subscription2.unsubscribe();
subscription3 = source.subscribe(v => console.log('observerC: ' + v));
subscription3.unsubscribe();
subscription4 = source.subscribe(v => console.log('observerD: ' + v));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>
当 运行 这个片段我们可以清楚地看到它没有为 Observer D 发出重复值,它实际上为每个订阅创建新的发射。怎么会?
每次订阅都会在下一次订阅发生之前取消订阅。这有效地使 refCount 减少回零,没有进行多播。
问题在于 realSource
流未完成。因为我们没有多播,所以下一个订阅者通过 ReplaySubject 获得了 realSource
的新实例,并且新的发射是在先前已经发射的发射之前。
因此,为了避免您的流多次调用昂贵的 HTTP 请求,您必须完成流,以便 publishReplay 知道它不需要 re-subscribe。
我不知道 publishReplay().refCount()
是如何工作的。
例如(https://jsfiddle.net/7o3a45L1/):
var source = Rx.Observable.create(observer => {
console.log("call");
// expensive http request
observer.next(5);
}).publishReplay().refCount();
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
console.log("");
subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)});
subscription2.unsubscribe();
console.log("");
subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)});
subscription3.unsubscribe();
console.log("");
subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)});
subscription4.unsubscribe();
给出以下结果:
call observerA: 5
observerB: 5 call observerB: 5
observerC: 5 observerC: 5 call observerC: 5
observerD: 5 observerD: 5 observerD: 5 call observerD: 5
1) 为什么会多次调用observerB、C、D?
2) 为什么每行打印 "call" 而不是行首?
此外,如果我调用 publishReplay(1).refCount()
,它会分别调用 observerB、C 和 D 两次。
我期望的是每个新观察者只收到一次值 5 并且 "call" 只打印一次。
发生这种情况是因为您正在使用 publishReplay()
。它在内部创建了一个 ReplaySubject
的实例,用于存储所有经过的值。
由于您使用 Observable.create
发出单个值,因此每次调用 source.subscribe(...)
时都会将一个值附加到 ReplaySubject
.
你没有在每一行的开头打印 call
因为它是 ReplaySubject
在你订阅时首先发出它的缓冲区然后它自己订阅它的来源:
有关实施详情,请参阅:
https://github.com/ReactiveX/rxjs/blob/master/src/operator/multicast.ts#L63
https://github.com/ReactiveX/rxjs/blob/master/src/ReplaySubject.ts#L54
使用publishReplay(1)
时也是如此。首先,它从 ReplaySubject
发出缓冲项,然后从 observer.next(5);
通常:refCount
意味着,只要至少有 1 个订阅者,流就是 hot/shared - 但是,当没有订阅者时,它就是 reset/cold。
这意味着如果你想绝对确定没有任何事情被执行超过一次,你不应该使用 refCount()
而只是 connect
流来设置热。
作为附加说明:如果您在 observer.next(5);
之后添加 observer.complete()
,您也会得到预期的结果。
旁注:您真的需要在这里创建自己的自定义 Obervable
吗?在 95% 的情况下,现有运算符足以满足给定的用例。
publishReplay(x).refCount()
合并执行以下操作:
- 它创建一个
ReplaySubject
重播最多 x 次发射。如果未定义 x 则它会重播完整的流。 - 它使用 refCount() 运算符使此
ReplaySubject
多播兼容。这导致 并发 订阅接收相同的排放量。
您的示例包含一些问题,影响了它们如何协同工作。请参阅以下修改后的片段:
var state = 5
var realSource = Rx.Observable.create(observer => {
console.log("creating expensive HTTP-based emission");
observer.next(state++);
// observer.complete();
return () => {
console.log('unsubscribing from source')
}
});
var source = Rx.Observable.of('')
.do(() => console.log('stream subscribed'))
.ignoreElements()
.concat(realSource)
.do(null, null, () => console.log('stream completed'))
.publishReplay()
.refCount()
;
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
subscription2 = source.subscribe(v => console.log('observerB: ' + v));
subscription2.unsubscribe();
subscription3 = source.subscribe(v => console.log('observerC: ' + v));
subscription3.unsubscribe();
subscription4 = source.subscribe(v => console.log('observerD: ' + v));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>
当 运行 这个片段我们可以清楚地看到它没有为 Observer D 发出重复值,它实际上为每个订阅创建新的发射。怎么会?
每次订阅都会在下一次订阅发生之前取消订阅。这有效地使 refCount 减少回零,没有进行多播。
问题在于 realSource
流未完成。因为我们没有多播,所以下一个订阅者通过 ReplaySubject 获得了 realSource
的新实例,并且新的发射是在先前已经发射的发射之前。
因此,为了避免您的流多次调用昂贵的 HTTP 请求,您必须完成流,以便 publishReplay 知道它不需要 re-subscribe。