如何在下一个发出的只有一个观察者的情况下连接观察者

How to concat observables with only one observer next emitted

你好,我想用 rxjs 实现一些限制,但我无法同时实现它们。

我想要实现的目标可能会被标注为:

first$      ---x|
second$         ------x|
subscribe   -----------x|

但这就是我得到的:

first$      ---x|
second$         ------x|
subscribe   ---x------x

使用此代码:

const checkFirstSide$: Observable<boolean> = this.checkSide('first');
const checkOtherSide$: Observable<boolean> = this.checkSide('other');

concat(
    checkFirstSide$,
    checkOtherSide$
).pipe(
    timeout(15000)
).subscribe({
    next: (success) => {
        doSomething(success);
    },
    error: (error) => {
        handleError(error);
    },
    complete: () => {
        doSomethingOnComplete();
    }
});

限制条件:

  1. 他们应该一个接一个地订阅
  2. 他们应该只在前一个成功的情况下订阅(不发出错误)
  3. 一切都应在 15 秒内超时
  4. 任何错误都应该中止(执行 handleError 并完成)
  5. 观察者next函数应该只执行一次然后complete

我相信您想要的行为是通过 forkJoin 函数实现的。查看 official API reference!


编辑:对不起,我误解了你的想法!我想我现在明白了......你需要的是使用管道和 switchMap 运算符:

checkFirstSide$.pipe(
  switchMap(resFirstSide => {
    doSomething(resFirstSide);
    return checkOtherSide$;
  });
).subscribe(resOtherSide => doSomethingOnComplete());

我认为最接近的是来自官方 API 参考的 concat。但是我不确定当一个 observable 抛出时它的行为如何。

要么……

当第一个 observable 发出时切换到第二个 observable。

checkFirstSide$.pipe(
  switchMap(x => checkOtherSide$),
  timeout(15000)
)

或者从您的可观察对象中收集值并在最后发出它们。

concat(
  checkFirstSide$,
  checkOtherSide$
).pipe(
  toArray(),
  timeout(15000)
)