使用 RxJS 暂停和恢复 Observable Stream,并仅在暂停期间从其他流发出自定义消息
Pause and resume Observable Stream and only emit custom messages from other stream during that pause with RxJS
我有一个 hot observable 不断发出消息。我需要使用 API REST 端点(如执行器 /messages/pause
)暂停它,并使用其他 API REST 端点(如 /messages/resume
)恢复它。在暂停期间,我需要允许其他 API REST 端点发出类似于原始可观察对象 /messages/custom
.
的消息
可以在该间隔期间暂停主要可观察流(暂停-恢复)但不停止观察模拟消息事件,并且continue/restore恢复后主要可观察流?
我认为这是一种方法:
// The 2 sources of events.
const main$ = /* ... */;
const second$ = /* ... */;
// Emits when the `/pause` endpoint is reached.
// `pause.next(true);`
// When the `/resume` endpoint is reached: `pause.next(false)`
const pause = new Subject<boolean>();
const decoratedMain$ = main$.pipe(
withLatestFrom(paused.pipe(startWith(false))),
filter(([mainValue, isPaused]) => !isPaused),
map(([mainValue]) => mainValue),
);
const decoratedSecond$ = second$.pipe(
withLatestFrom(paused),
filter(([secondValue, isPaused]) => isPaused),
map(([secondValue]) => secondValue),
);
merge(decoratedMain$, decoratedSecond$).subscribe();
上面的代码片段,除非我遗漏了什么,否则应该实现这个逻辑:
- 当
main$
observable 发出时,不会考虑 second$
observable 的事件
main$
可以暂停(使用 pause.next(true)
)和恢复(使用 pause.next(false)
)
- 当
main$
暂停时,它的事件将被忽略,现在考虑 second$
observable 的事件
- 当
main$
恢复时,开关 再次发生:second$
的事件被忽略,main$
的事件被忽略考虑
现在让我们看看使用了 RxJS 的哪一部分魔法来实现这一点。
您可能已经注意到,主要逻辑围绕 withLatestFrom
和 pause
主题展开。
decoratedMain$
observable 持续发射,但其事件是否被忽略取决于 pause
的最新值。如果到达 /pause
端点,则事件将被忽略,因为 pause
会发出 true
.
decoratedSecond$
是对称建造的。如果 pause
的最新值为 false
.
,则忽略其事件
最后,我认为分享上述方法的一个小变体会有所帮助,仅供学习之用:
/* ... */
const isMainPaused$ = pause.pipe(filter(Boolean));
const isMainResumed$ = pause.pipe(filter(v => !v));
const decoratedMain$ = main$.pipe(
share({ resetOnRefCountZero: false }),
takeUntil(isMainPaused$),
repeatWhen(completionsSubject => completionsSubject.pipe(mergeMapTo(isMainResumed$)))
);
/* ... */
其余代码与第一种方法相同。在这里,发生的事情实质上是通过不让任何订阅者订阅 share
的 Subject
实例来忽略 main$
的事件。当 pause
发出 false
时,将为该 Subject
实例创建一个新的订阅者,没有 re-subscribing 到源,这是由于 resetOnRefCountZero: false
选项。
我有一个 hot observable 不断发出消息。我需要使用 API REST 端点(如执行器 /messages/pause
)暂停它,并使用其他 API REST 端点(如 /messages/resume
)恢复它。在暂停期间,我需要允许其他 API REST 端点发出类似于原始可观察对象 /messages/custom
.
可以在该间隔期间暂停主要可观察流(暂停-恢复)但不停止观察模拟消息事件,并且continue/restore恢复后主要可观察流?
我认为这是一种方法:
// The 2 sources of events.
const main$ = /* ... */;
const second$ = /* ... */;
// Emits when the `/pause` endpoint is reached.
// `pause.next(true);`
// When the `/resume` endpoint is reached: `pause.next(false)`
const pause = new Subject<boolean>();
const decoratedMain$ = main$.pipe(
withLatestFrom(paused.pipe(startWith(false))),
filter(([mainValue, isPaused]) => !isPaused),
map(([mainValue]) => mainValue),
);
const decoratedSecond$ = second$.pipe(
withLatestFrom(paused),
filter(([secondValue, isPaused]) => isPaused),
map(([secondValue]) => secondValue),
);
merge(decoratedMain$, decoratedSecond$).subscribe();
上面的代码片段,除非我遗漏了什么,否则应该实现这个逻辑:
- 当
main$
observable 发出时,不会考虑second$
observable 的事件 main$
可以暂停(使用pause.next(true)
)和恢复(使用pause.next(false)
)- 当
main$
暂停时,它的事件将被忽略,现在考虑second$
observable 的事件 - 当
main$
恢复时,开关 再次发生:second$
的事件被忽略,main$
的事件被忽略考虑
现在让我们看看使用了 RxJS 的哪一部分魔法来实现这一点。
您可能已经注意到,主要逻辑围绕 withLatestFrom
和 pause
主题展开。
decoratedMain$
observable 持续发射,但其事件是否被忽略取决于 pause
的最新值。如果到达 /pause
端点,则事件将被忽略,因为 pause
会发出 true
.
decoratedSecond$
是对称建造的。如果 pause
的最新值为 false
.
最后,我认为分享上述方法的一个小变体会有所帮助,仅供学习之用:
/* ... */
const isMainPaused$ = pause.pipe(filter(Boolean));
const isMainResumed$ = pause.pipe(filter(v => !v));
const decoratedMain$ = main$.pipe(
share({ resetOnRefCountZero: false }),
takeUntil(isMainPaused$),
repeatWhen(completionsSubject => completionsSubject.pipe(mergeMapTo(isMainResumed$)))
);
/* ... */
其余代码与第一种方法相同。在这里,发生的事情实质上是通过不让任何订阅者订阅 share
的 Subject
实例来忽略 main$
的事件。当 pause
发出 false
时,将为该 Subject
实例创建一个新的订阅者,没有 re-subscribing 到源,这是由于 resetOnRefCountZero: false
选项。