为什么在创建新订阅时会重置扫描累加器值,但在使用 shareReplay(1) 时不会重置?
Why does scan accumulator value reset when new subscriptions are created but not when using shareReplay(1)?
我编写了以下代码,期望最后一次订阅会打印 ['emitted value 1'、'emitted value 2'],但实际上只打印 ['emitted value 2']。这是有道理的,因为 BehaviorSubject 发出最后一个值,但为什么扫描累加器会重置为种子值?
我发现使用 shareReplay(1) 可以轻松修复此行为,但我不太明白为什么。在这两种情况下,扫描累加器发生了什么,使用 shareReplay(1) 还是不使用它?
const subject = new BehaviorSubject(null);
const action$ = subject.asObservable();
const obs$ = action$.pipe(scan((acc, curr) => [...acc, curr], []));
// const obs$ = action$.pipe(scan((acc, curr) => [...acc, curr], []), shareReplay(1));
obs$.subscribe(res => console.log('first subs', res));
subject.next('emitted value 1');
subject.next('emitter value 2');
obs$.subscribe(res => console.log('second subs', res));
This is the console output without shareReplay(1)
And with shareReplay(1)
当您订阅 BehaviorSubject
时,您将收到最后发出的值。这就是为什么您在控制台中只收到:second subs ["emitter value 2"]
。
shareReplay
来自 doc :
Why use shareReplay? You generally want to use shareReplay when you have side-effects or taxing computations that you do not wish to
be executed amongst multiple subscribers. It may also be valuable in
situations where you know you will have late subscribers to a stream
that need access to previously emitted values. This ability to replay
values on subscription is what differentiates share and shareReplay.
这意味着您将收到所有发出的值而无需任何计算,这是一种缓存。这在控制台中也可见,您只有一个打印包含第二个订阅的所有值:
second subs [null, "emitted value 1", "emitter value 2"]
当您订阅一个 observable 时,由于找不到更好的词,它会变成 'run'。这可能会变得有点混乱,因为值会随着时间的推移而分散,所以我们会坚持使用同步调用。考虑以下可观察的。
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(() => from([getRandom(), getRandom(), getRandom()]));
stream$.subscribe(console.log); // 4 6 2
stream$.subscribe(console.log); // 4 4 2
stream$.subscribe(console.log); // 1 8 6
stream$.subscribe(console.log); // 8 6 7
注意到每个订阅如何获得不同的号码了吗?他们都是 re-running getRandom()
,他们的订阅值不同。
如果我们在 stream$
中放置一个累加器,每个订阅都会从一个新的种子开始,并根据他们的流发出的内容获得结果。
所以:
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
reduce((acc, val) => acc + val, 0)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 10 (4+4+2)
stream$.subscribe(console.log); // 15 (1+8+6)
stream$.subscribe(console.log); // 21 (8+6+7)
看看为什么每个订阅都没有添加到前一个?它只是将自己的数字相加。
他们不会在不同的订阅之间共享累加器或值或任何东西。
那么当你使用 shareReplay()
之类的东西时会发生什么?在这种情况下,stream$ 不会被订阅 'run' 4 次不同的时间,它只会被订阅一次,无论之后有多少人订阅,单次订阅的结果都会共享。
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
reduce((acc, val) => acc + val, 0),
shareReplay(1)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
在这里,我们的累加器只会得到 运行 一次。
如果你这样改:
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
shareReplay(3),
reduce((acc, val) => acc + val, 0)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
然后累加器是 运行 4 次,但是 getRandom()
只会被调用以创建前 3 个值,之后每个订阅都会获得相同的 3 个数字。
所以,回答你的问题:
当您没有 shareReplay 时,每个订阅者 运行 都有自己的 scan
流从他们订阅时开始。 WITH shareReply,scan
仅订阅一次。您将这些结果多播给所有未来的订阅者。这就是区别。
我编写了以下代码,期望最后一次订阅会打印 ['emitted value 1'、'emitted value 2'],但实际上只打印 ['emitted value 2']。这是有道理的,因为 BehaviorSubject 发出最后一个值,但为什么扫描累加器会重置为种子值?
我发现使用 shareReplay(1) 可以轻松修复此行为,但我不太明白为什么。在这两种情况下,扫描累加器发生了什么,使用 shareReplay(1) 还是不使用它?
const subject = new BehaviorSubject(null);
const action$ = subject.asObservable();
const obs$ = action$.pipe(scan((acc, curr) => [...acc, curr], []));
// const obs$ = action$.pipe(scan((acc, curr) => [...acc, curr], []), shareReplay(1));
obs$.subscribe(res => console.log('first subs', res));
subject.next('emitted value 1');
subject.next('emitter value 2');
obs$.subscribe(res => console.log('second subs', res));
This is the console output without shareReplay(1)
And with shareReplay(1)
当您订阅 BehaviorSubject
时,您将收到最后发出的值。这就是为什么您在控制台中只收到:second subs ["emitter value 2"]
。
shareReplay
来自 doc :
Why use shareReplay? You generally want to use shareReplay when you have side-effects or taxing computations that you do not wish to be executed amongst multiple subscribers. It may also be valuable in situations where you know you will have late subscribers to a stream that need access to previously emitted values. This ability to replay values on subscription is what differentiates share and shareReplay.
这意味着您将收到所有发出的值而无需任何计算,这是一种缓存。这在控制台中也可见,您只有一个打印包含第二个订阅的所有值:
second subs [null, "emitted value 1", "emitter value 2"]
当您订阅一个 observable 时,由于找不到更好的词,它会变成 'run'。这可能会变得有点混乱,因为值会随着时间的推移而分散,所以我们会坚持使用同步调用。考虑以下可观察的。
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(() => from([getRandom(), getRandom(), getRandom()]));
stream$.subscribe(console.log); // 4 6 2
stream$.subscribe(console.log); // 4 4 2
stream$.subscribe(console.log); // 1 8 6
stream$.subscribe(console.log); // 8 6 7
注意到每个订阅如何获得不同的号码了吗?他们都是 re-running getRandom()
,他们的订阅值不同。
如果我们在 stream$
中放置一个累加器,每个订阅都会从一个新的种子开始,并根据他们的流发出的内容获得结果。
所以:
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
reduce((acc, val) => acc + val, 0)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 10 (4+4+2)
stream$.subscribe(console.log); // 15 (1+8+6)
stream$.subscribe(console.log); // 21 (8+6+7)
看看为什么每个订阅都没有添加到前一个?它只是将自己的数字相加。
他们不会在不同的订阅之间共享累加器或值或任何东西。
那么当你使用 shareReplay()
之类的东西时会发生什么?在这种情况下,stream$ 不会被订阅 'run' 4 次不同的时间,它只会被订阅一次,无论之后有多少人订阅,单次订阅的结果都会共享。
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
reduce((acc, val) => acc + val, 0),
shareReplay(1)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
在这里,我们的累加器只会得到 运行 一次。
如果你这样改:
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
shareReplay(3),
reduce((acc, val) => acc + val, 0)
);
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
stream$.subscribe(console.log); // 12 (4+6+2)
然后累加器是 运行 4 次,但是 getRandom()
只会被调用以创建前 3 个值,之后每个订阅都会获得相同的 3 个数字。
所以,回答你的问题:
当您没有 shareReplay 时,每个订阅者 运行 都有自己的 scan
流从他们订阅时开始。 WITH shareReply,scan
仅订阅一次。您将这些结果多播给所有未来的订阅者。这就是区别。