具有实时缓冲区输出的 RXJS 计数器

RXJS Counter with live buffer output

我正在尝试使用 RXJS 实现一种计数器。有一个主题(数字)可以逐步递增或递减。最后,它应该发出计数器值,可以是正数或负数。

如下例所示,输入去抖一秒。这意味着,可观察对象在一秒钟不活动和 returns 值的总和后发出。

const mySubject$: Subject<number> = new Subject<number>;

get counter$(): Observable<number> {
  return this.mySubject$.pipe(
    buffer(this.mySubject$.pipe(debounceTime(1000))),
    map(steps => steps.reduce((accumulator, currentValue) => accumulator + currentValue, 0))
  );
}

increment(): void {
  this.mySubject$.next(1);
}

reduce(): void {
  this.mySubject$.next(-1);
}

到目前为止还不错...但是现在,我想在 increment/reduce 上的某处实时显示当前计数器值,然后它会在一秒钟后发出。有没有办法在去抖动时间结束之前访问缓冲数组?

我可以在 observable 之外创建另一个计数器变量,并将值放在 buffer() 之前的 tap() 中。但这似乎不是 RXJS 的方式?

const mySubject$: Subject<number> = new Subject<number>;
const counter: number = 0;

get counter$(): Observable<number> {
  return this.mySubject$.pipe(
    tap(value => this.counter += value),
    buffer(this.mySubject$.pipe(debounceTime(1000))),
    map(steps => steps.reduce((accumulator, currentValue) => accumulator + currentValue, 0)),
    tap(() => this.counter = 0)
  );
}

任何关于干净方式的想法,如何做到这一点?


代码的用途:我在电视上有一个应用 运行。在流式传输期间,用户可以使用遥控器上的两个专用键以 30 秒为步长 "jump seek" 后退或前进。例如,如果用户连续按下按钮 3 次,则流向后或向前跳转 90 秒。在按键动作之间有一秒的延迟,在累积值应该发出之前。但是每次用户按下按钮时,都会有一个可见的输出(如祝酒词)显示跳跃搜索将发射多少秒。

您也许可以尝试使用扫描,它会为您完成求和工作,因此您可以在某处提示它并仍然对输出进行去抖动。

get counter$(): Observable<number> {
  return this.mySubject$.pipe(
    scan((acc, delta) => delta ? acc + delta : 0, 0),
    tap((value) => *you can use the total accumulator so far here*),
    debounceTime(1000),
  );
}

要重置计数器,您只需发送一个空值即可。

我会使用一个主题 (_jumpBy$),用于触发命令,以及两个映射的可观察对象:一个 (jumpByToShow$) 立即显示通过 scan 累积的值,另一个对第一个的发射进行去抖动,并在发射一个值时将累积值清零,使主体发射一个 -amount 值。

未经测试,希望对您有所帮助。

  private _jumpBy$ = new Subject<number>();

  // the quantity changing immediately, to be shown on screen
  public jumpByToShow$: Observable<number>;

  // the quantity debounced
  public jumpByToExecute$: Observable<number>;

  constructor(/* ... */) {
    this.jumpByToShow$ = this._jumpBy$.pipe(
      startWith(0),
      scan((amount, newAmount) => (amount + newAmount), 0),
      share()
    );

    this.jumpByToExecute$ = this.jumpByToShow$.pipe(
      debounceTime(1000),
      filter(amount => amount !== 0),
      tap(amount => this._jumpBy$.next(-amount))
    );
  }