使用 RxJS 暂停和恢复 Observable Stream,并仅在暂停期间从其他流发出自定义消息

Pause and resume Observable Stream and only emit custom messages from other stream during that pause with RxJS

我有一个 hot observable 不断发出消息。我需要使用 API REST 端点(如执行器 /messages/pause)暂停它,并使用其他 API REST 端点(如 /messages/resume)恢复它。在暂停期间,我需要允许其他 API REST 端点发出类似于原始可观察对象 /messages/custom.

的消息

可以在该间隔期间暂停主要可观察流(暂停-恢复)但不停止观察模拟消息事件,并且continue/restore恢复后主要可观察流?

我认为这是一种方法:

// The 2 sources of events.
const main$ = /* ... */;
const second$ = /* ... */;

// Emits when the `/pause` endpoint is reached.
// `pause.next(true);`
// When the `/resume` endpoint is reached: `pause.next(false)`
const pause = new Subject<boolean>();

const decoratedMain$ = main$.pipe(
  withLatestFrom(paused.pipe(startWith(false))),
  filter(([mainValue, isPaused]) => !isPaused),
  map(([mainValue]) => mainValue),
);

const decoratedSecond$ = second$.pipe(
  withLatestFrom(paused),
  filter(([secondValue, isPaused]) => isPaused),
  map(([secondValue]) => secondValue),
);

merge(decoratedMain$, decoratedSecond$).subscribe();

上面的代码片段,除非我遗漏了什么,否则应该实现这个逻辑:

  • main$ observable 发出时,不会考虑 second$ observable 的事件
  • main$ 可以暂停(使用 pause.next(true))和恢复(使用 pause.next(false)
  • main$ 暂停时,它的事件将被忽略,现在考虑 second$ observable 的事件
  • main$ 恢复时,开关 再次发生:second$ 的事件被忽略,main$ 的事件被忽略考虑

现在让我们看看使用了 RxJS 的哪一部分魔法来实现这一点。

您可能已经注意到,主要逻辑围绕 withLatestFrompause 主题展开。

decoratedMain$ observable 持续发射,但其事件是否被忽略取决于 pause 的最新值。如果到达 /pause 端点,则事件将被忽略,因为 pause 会发出 true.

decoratedSecond$ 是对称建造的。如果 pause 的最新值为 false.

,则忽略其事件

最后,我认为分享上述方法的一个小变体会有所帮助,仅供学习之用:

/* ... */
const isMainPaused$ = pause.pipe(filter(Boolean));
const isMainResumed$ = pause.pipe(filter(v => !v));

const decoratedMain$ = main$.pipe(
  share({ resetOnRefCountZero: false }),
  takeUntil(isMainPaused$),
  repeatWhen(completionsSubject => completionsSubject.pipe(mergeMapTo(isMainResumed$)))
);

/* ... */

其余代码与第一种方法相同。在这里,发生的事情实质上是通过不让任何订阅者订阅 shareSubject 实例来忽略 main$ 的事件。当 pause 发出 false 时,将为该 Subject 实例创建一个新的订阅者,没有 re-subscribing 到源,这是由于 resetOnRefCountZero: false 选项。