RxJS share 与 shareReplay 的区别
RxJS share vs shareReplay differences
share 和 shareReplay(refcount:true)取消订阅的方式似乎存在奇怪的差异。
考虑以下内容(可以粘贴到 rxviz.com):
const { interval } = Rx;
const { take, shareReplay, share, timeoutWith, startWith , finalize} = RxOperators;
const shareReplay$ = interval(2000).pipe(
finalize(() => console.log('[finalize] Called shareReplay$')),
take(1),
shareReplay({refcount:true, bufferSize: 0}));
shareReplay$.pipe(
timeoutWith(1000, shareReplay$.pipe(startWith('X'))),
)
const share$ = interval(2000).pipe(
finalize(() => console.log('[finalize] Called on share$')),
take(1),
share());
share$.pipe(
timeoutWith(1000, share$.pipe(startWith('X'))),
)
streamReplay$ 的输出将为 -X-0-,而 shareStream$ 的输出将为 -X--0。似乎共享在 timeoutWith 可以重新订阅之前从源取消订阅,而 shareReplay 设法将共享订阅保持足够长的时间以便重新使用。
我想用它在 RPC 上添加一个本地超时(同时保持调用打开),任何重新订阅都将是灾难性的,所以想避免这些行为中的一个是错误的并得到以后改了。
我可以使用 race(),并将 rpc 调用与延迟的 startsWith 合并(因此它们同时订阅),但它会需要更多的代码和运算符。
编辑:一种可能的解决方案是合并两个订阅,一个是共享请求,另一个是延迟流,直到共享流发出:
merge(share$, of('still working...').pipe( delay(1000), takeUntil(share$)));
通过这种方式可以同时订阅共享流,因此当一个操作员取消订阅时不存在“灰色区域”。 (除非有人提出更好的建议,否则会将其转化为答案)或者可以解释 share 和 shareReplay
之间的 intentions/differences
时间保证
Javascript 的 运行 时间没有任何形式的时间保证。在单线程环境中,这是有道理的(并且您可以开始使用 web workers 等做得更好)。如果 compute-heavy 发生了什么,那个时间点 window 涵盖的所有内容都会等待。不过,大多数情况下它会按预期顺序发生。
无论如何,
const hello = (name: string) => () => console.log(`Hello ${name}`);
setTimeout(hello("first"), 2000);
setTimeout(hello("second"), 2000);
在 Node、V8 或 SpiderMonkey 中,您确实会保留此处的顺序。但是这个呢?
const hello = (name: string) => () => console.log(`Hello ${name}`);
setTimeout(hello("first"), 1);
setTimeout(hello("second"), 0);
在这里你会假设第二总是第一,因为它应该早一毫秒发生。如果您 运行 使用 SpiderMonkey,则顺序取决于事件循环的繁忙程度。他们使用较短的超时时间,因为 0 毫秒的超时时间平均需要大约 8 毫秒。
始终使异步依赖显式
在 JavaScript 中,最好的做法是永远不要隐含任何时序依赖性。
在下面的代码中,我们可以合理地知道调用data.value
时data
不会是undefined。这隐含地依赖于异步交错:
let data;
setTimeout(() => {data = {value: 5};}, 1000);
setTimeout(() => console.log(data.value), 2000);
我们真的应该明确说明这种依赖关系。通过检查数据是否未定义或通过重组我们的调用
setTimeout(() => {
const data = {value: 5};
setTimeout(() => console.log(data.value), 1000);
}, 1000);
分享与 ShareReplay
want to avoid the risk one of these behaviors is a mistake and gets changed in the future.
这里真正的风险甚至不在于图书馆如何实现差异。这也是 language-level 的风险。无论哪种方式,您都依赖于异步交错。你 运行 有一个 bug 的风险,这个 bug 只出现一次在蓝色月亮上并且不容易 re-created/tested 等等
分享运营商有 ShareConfig
(source)
export interface ShareConfig<T> {
connector?: () => SubjectLike<T>;
resetOnError?: boolean | ((error: any) => Observable<any>);
resetOnComplete?: boolean | (() => Observable<any>);
resetOnRefCountZero?: boolean | (() => Observable<any>);
}
如果您使用 vanilla shareReplay(1)
或 replay({resetOnRefCountZero: false})
,那么您不依赖于 JS 事件循环中事件的排序方式。
速记:
我怀疑在描述您所追求的行为之前,您不会得到很多答案。
use this to add a local timeout on an RPC (while keeping the call open)
我不确定保持 RPC 调用打开需要什么。这听起来像是您想要取消 RxJS 中的请求但不取消飞行中的 RPC 的东西。我觉得那是域不匹配。为什么不让您的可观察对象与它们所代表的调用的语义保持一致?
寻求解决方案
看到你的更新,我怀疑你不需要超时。看起来您可以在不取消订阅代表您的 RPC 的可观察对象的情况下实现您所追求的行为。我认为这样做可以简化您的逻辑,并使将来 maintain/extend 更容易。
它还避免了您之前遇到的所有异步交错问题。
在这里,我将对您所追求的行为进行有根据的猜测。看来您想关注:
- 从源中取一个值(在本例中恰好是一个 RPC)。
- 如果该值需要超过 1 秒才能到达,则发出“仍在工作...”并继续等待来自源的一个值。
如果是这样,您根本不需要 share
。通常,超时意味着如果花费的时间太长则取消飞行中的请求。也许您根本不想超时,您想继续订阅源...
如果是这样的话,这里有一个方法可以做到这一点:
const source$ = rpc(arg1 arg2);
// create a unique token (its memory address).
// We'll embed a message inside so it's doing double duty
const token = {a: "still working..."};
merge(
source$,
timer(1000).pipe(mapTo(token))
).pipe(
// take(1), but ignoring the token
takeWhile(v => v === token, true),
// Unwrap the token, emit the contained message
map(v => v === token ? v.a : v)
);
另一方面,如果您知道可观察包装您的 RPC 永远不会发出“仍在工作...”,那么您不需要唯一令牌,您可以简化它以直接检查值。
merge(
rpc(arg1 arg2),
timer(1000).pipe(mapTo("still working..."))
).pipe(
takeWhile(v => v === "still working...", true)
);
share 和 shareReplay(refcount:true)取消订阅的方式似乎存在奇怪的差异。
考虑以下内容(可以粘贴到 rxviz.com):
const { interval } = Rx;
const { take, shareReplay, share, timeoutWith, startWith , finalize} = RxOperators;
const shareReplay$ = interval(2000).pipe(
finalize(() => console.log('[finalize] Called shareReplay$')),
take(1),
shareReplay({refcount:true, bufferSize: 0}));
shareReplay$.pipe(
timeoutWith(1000, shareReplay$.pipe(startWith('X'))),
)
const share$ = interval(2000).pipe(
finalize(() => console.log('[finalize] Called on share$')),
take(1),
share());
share$.pipe(
timeoutWith(1000, share$.pipe(startWith('X'))),
)
streamReplay$ 的输出将为 -X-0-,而 shareStream$ 的输出将为 -X--0。似乎共享在 timeoutWith 可以重新订阅之前从源取消订阅,而 shareReplay 设法将共享订阅保持足够长的时间以便重新使用。
我想用它在 RPC 上添加一个本地超时(同时保持调用打开),任何重新订阅都将是灾难性的,所以想避免这些行为中的一个是错误的并得到以后改了。
我可以使用 race(),并将 rpc 调用与延迟的 startsWith 合并(因此它们同时订阅),但它会需要更多的代码和运算符。
编辑:一种可能的解决方案是合并两个订阅,一个是共享请求,另一个是延迟流,直到共享流发出:
merge(share$, of('still working...').pipe( delay(1000), takeUntil(share$)));
通过这种方式可以同时订阅共享流,因此当一个操作员取消订阅时不存在“灰色区域”。 (除非有人提出更好的建议,否则会将其转化为答案)或者可以解释 share 和 shareReplay
之间的 intentions/differences时间保证
Javascript 的 运行 时间没有任何形式的时间保证。在单线程环境中,这是有道理的(并且您可以开始使用 web workers 等做得更好)。如果 compute-heavy 发生了什么,那个时间点 window 涵盖的所有内容都会等待。不过,大多数情况下它会按预期顺序发生。
无论如何,
const hello = (name: string) => () => console.log(`Hello ${name}`);
setTimeout(hello("first"), 2000);
setTimeout(hello("second"), 2000);
在 Node、V8 或 SpiderMonkey 中,您确实会保留此处的顺序。但是这个呢?
const hello = (name: string) => () => console.log(`Hello ${name}`);
setTimeout(hello("first"), 1);
setTimeout(hello("second"), 0);
在这里你会假设第二总是第一,因为它应该早一毫秒发生。如果您 运行 使用 SpiderMonkey,则顺序取决于事件循环的繁忙程度。他们使用较短的超时时间,因为 0 毫秒的超时时间平均需要大约 8 毫秒。
始终使异步依赖显式
在 JavaScript 中,最好的做法是永远不要隐含任何时序依赖性。
在下面的代码中,我们可以合理地知道调用data.value
时data
不会是undefined。这隐含地依赖于异步交错:
let data;
setTimeout(() => {data = {value: 5};}, 1000);
setTimeout(() => console.log(data.value), 2000);
我们真的应该明确说明这种依赖关系。通过检查数据是否未定义或通过重组我们的调用
setTimeout(() => {
const data = {value: 5};
setTimeout(() => console.log(data.value), 1000);
}, 1000);
分享与 ShareReplay
want to avoid the risk one of these behaviors is a mistake and gets changed in the future.
这里真正的风险甚至不在于图书馆如何实现差异。这也是 language-level 的风险。无论哪种方式,您都依赖于异步交错。你 运行 有一个 bug 的风险,这个 bug 只出现一次在蓝色月亮上并且不容易 re-created/tested 等等
分享运营商有 ShareConfig
(source)
export interface ShareConfig<T> {
connector?: () => SubjectLike<T>;
resetOnError?: boolean | ((error: any) => Observable<any>);
resetOnComplete?: boolean | (() => Observable<any>);
resetOnRefCountZero?: boolean | (() => Observable<any>);
}
如果您使用 vanilla shareReplay(1)
或 replay({resetOnRefCountZero: false})
,那么您不依赖于 JS 事件循环中事件的排序方式。
速记:
我怀疑在描述您所追求的行为之前,您不会得到很多答案。
use this to add a local timeout on an RPC (while keeping the call open)
我不确定保持 RPC 调用打开需要什么。这听起来像是您想要取消 RxJS 中的请求但不取消飞行中的 RPC 的东西。我觉得那是域不匹配。为什么不让您的可观察对象与它们所代表的调用的语义保持一致?
寻求解决方案
看到你的更新,我怀疑你不需要超时。看起来您可以在不取消订阅代表您的 RPC 的可观察对象的情况下实现您所追求的行为。我认为这样做可以简化您的逻辑,并使将来 maintain/extend 更容易。
它还避免了您之前遇到的所有异步交错问题。
在这里,我将对您所追求的行为进行有根据的猜测。看来您想关注:
- 从源中取一个值(在本例中恰好是一个 RPC)。
- 如果该值需要超过 1 秒才能到达,则发出“仍在工作...”并继续等待来自源的一个值。
如果是这样,您根本不需要 share
。通常,超时意味着如果花费的时间太长则取消飞行中的请求。也许您根本不想超时,您想继续订阅源...
如果是这样的话,这里有一个方法可以做到这一点:
const source$ = rpc(arg1 arg2);
// create a unique token (its memory address).
// We'll embed a message inside so it's doing double duty
const token = {a: "still working..."};
merge(
source$,
timer(1000).pipe(mapTo(token))
).pipe(
// take(1), but ignoring the token
takeWhile(v => v === token, true),
// Unwrap the token, emit the contained message
map(v => v === token ? v.a : v)
);
另一方面,如果您知道可观察包装您的 RPC 永远不会发出“仍在工作...”,那么您不需要唯一令牌,您可以简化它以直接检查值。
merge(
rpc(arg1 arg2),
timer(1000).pipe(mapTo("still working..."))
).pipe(
takeWhile(v => v === "still working...", true)
);