如果 source1 超时,RxJS 从 source2 发出,当 source1 返回时从 source1 发出
RxJS emit from source2 if source1 has timed out and when source1 is back emit from source1
我有一个发出值的 Observable 源 source1
,如果它没有发出任何东西超过 2 秒,我想切换到后备源 source2
。如果 source1
再次发射,我想从中发射。以此类推,无限期。
到目前为止,我有以下内容
import { timeout, catchError, takeUntil, concat } from 'rxjs/operators';
declare const source1: Observable;
declare const source2: Observable;
source1.pipe(
timeout(2000),
catchError(() => {
return source2.pipe(
takeUntil(source1)
);
}),
concat(source1)
).subscribe(val => console.log(val));
这几乎行得通。如果source1
在2秒后没有发射,它会从source2
发射直到source1
再次发射然后切换到source1
。但是有两个主要缺陷:
- 当
source1
再次发出时,第一个发出的值是takeUntil
的"caught"(source1
是一个热观察值)并且不会在concat(source1)
- 如果
source1
第二次停止发射,我希望有相同的行为。在我的实现中,它只工作一次。
知道如何解决这个问题吗?
我猜你可以通过共享 source1
然后使用 repeat
重新订阅同一条链来做到这一点(我没有测试):
const shared1 = source1.pipe(share());
source1.pipe(
timeout(2000),
catchError(() => merge(source1, source2).pipe(
takeUntil(source1),
)),
repeat(),
).subscribe(val => console.log(val));
我找到的解决我的第 1 点和第 2 点的解决方案如下
const source1HasStopped = source1.pipe(
timeout(2000),
catchError(() => of(1))
);
const fallback = source2.pipe(
skipUntil(source1HasStopped),
takeUntil(source1),
repeat()
);
merge(source1, fallback).subscribe(console.log);
编辑:不幸的是,这会造成订阅泄漏,因为 takeUntil
不是最后一个...
我有一个发出值的 Observable 源 source1
,如果它没有发出任何东西超过 2 秒,我想切换到后备源 source2
。如果 source1
再次发射,我想从中发射。以此类推,无限期。
到目前为止,我有以下内容
import { timeout, catchError, takeUntil, concat } from 'rxjs/operators';
declare const source1: Observable;
declare const source2: Observable;
source1.pipe(
timeout(2000),
catchError(() => {
return source2.pipe(
takeUntil(source1)
);
}),
concat(source1)
).subscribe(val => console.log(val));
这几乎行得通。如果source1
在2秒后没有发射,它会从source2
发射直到source1
再次发射然后切换到source1
。但是有两个主要缺陷:
- 当
source1
再次发出时,第一个发出的值是takeUntil
的"caught"(source1
是一个热观察值)并且不会在concat(source1)
- 如果
source1
第二次停止发射,我希望有相同的行为。在我的实现中,它只工作一次。
知道如何解决这个问题吗?
我猜你可以通过共享 source1
然后使用 repeat
重新订阅同一条链来做到这一点(我没有测试):
const shared1 = source1.pipe(share());
source1.pipe(
timeout(2000),
catchError(() => merge(source1, source2).pipe(
takeUntil(source1),
)),
repeat(),
).subscribe(val => console.log(val));
我找到的解决我的第 1 点和第 2 点的解决方案如下
const source1HasStopped = source1.pipe(
timeout(2000),
catchError(() => of(1))
);
const fallback = source2.pipe(
skipUntil(source1HasStopped),
takeUntil(source1),
repeat()
);
merge(source1, fallback).subscribe(console.log);
编辑:不幸的是,这会造成订阅泄漏,因为 takeUntil
不是最后一个...