RxJS:如何延迟下一个发射值?

RxJS: How to delay next emitted value?

我有异步功能handleParamsChanges,这可能需要几秒钟才能解决。我在 observable 发出值时调用它:

this._activatedRoute.params
  .subscribe(params => {
    this.handleParamsChanges(params).then(() => {
      // new value can be processed now
    });
  });

我如何修改我的代码,以便如果 observable 一个接一个地发出 2 个值,第一个 handleParamsChanges 被调用以获得第一个值,并且只有 这个承诺解决, 它用第二个值调用,依此类推。

编辑:

这是我想出的解决方案,但我猜还有更好的方法:

const params$ = this._activatedRoute.params;
const canExecute$ = new BehaviorSubject(true);

combineLatest(params$, canExecute$)
  .pipe(
    filter(([_, canExecute]) => canExecute),
    map(([params]) => params),
    distinctUntilChanged()
  )
  .subscribe(async params => {
    canExecute$.next(false);
    try {
      await this.handleParamsChanges(params);
    } catch (e) {
      console.log(e);
    } finally {
      canExecute$.next(true);
    }
  })

我正在使用 canExecute$ 来延迟处理新值。

我需要在这里使用 distinctUntilChanged 以避免创建无限循环。

您要找的是concatMap。它等待之前的“内部可观察”完成,然后再次订阅。你也可以大大简化你的管道:

params$.pipe(
  concatMap(params => this.handleParamsChanges(params)),
).subscribe()