如何为 RxJS 计时器添加停止和启动功能?

How to add a stop and start feature for an RxJS timer?

我添加了开始、停止、暂停按钮。 Start 将启动一个倒计时计时器,该计时器将从一个值开始,一直递减直到值达到 0。我们可以通过单击暂停按钮暂停计时器。单击“停止”时,定时器观察也完成。

const subscription = merge(
  startClick$.pipe(mapTo(true)),
  pauseBtn$.pipe(mapTo(false))
)
  .pipe(
    tap(val => {
      console.log(val);
    }),
    switchMap(val => (val ? interval(10).pipe(takeUntil(stopClick$)) : EMPTY)),
    mapTo(-1),
    scan((acc: number, curr: number) => acc + curr, startValue),
    takeWhile(val => val >= 0),
    repeatWhen(() => startClick$),
    startWith(startValue)
  )
  .subscribe(val => {
    counterDisplayHeader.innerHTML = val.toString();
  });

Stackblitz 代码 link 可用 here

这是一个向上计数而不是向下计数的秒表示例。也许你可以重新加工它。

type StopwatchAction = "START" | "STOP" | "RESET" | "END";

function createStopwatch(
  control$: Observable<StopwatchAction>, 
  interval = 1000
): Observable<number>{

  return defer(() => {
    let toggle: boolean = false;
    let count: number = 0;

    const ticker = timer(0, interval).pipe(
      map(x => count++)
    );
    const end$ = of("END");

    return concat(
      control$,
      end$
    ).pipe(
      catchError(_ => end$),
      switchMap(control => {
        if(control === "START" && !toggle){
          toggle = true;
          return ticker;
        }else if(control === "STOP" && toggle){
          toggle = false;
          return EMPTY;
        }else if(control === "RESET"){
          count = 0;
          if(toggle){
            return ticker;
          }
        }
        return EMPTY;
      })
    );
  });
}

这是一个使用中的例子:

const start$: Observable<StopwatchAction> = fromEvent(startBtn, 'click').pipe(mapTo("START"));
const reset$: Observable<StopwatchAction> = fromEvent(resetBtn, 'click').pipe(mapTo("RESET"));

createStopwatch(merge(start$,reset$)).subscribe(seconds => {
  secondsField.innerHTML  = seconds % 60;
  minuitesField.innerHTML = Math.floor(seconds / 60) % 60;
  hoursField.innerHTML    = Math.floor(seconds / 3600);
});

这是一个相当复杂的用例。我认为有两个问题:

  • 您对 startClick$ 有两个订阅,在这种情况下订阅的顺序很重要。当链完成时,repeatWhen 正在等待 startClick$ 发出。但是,当您单击该按钮时,发射首先传播到 merge(...) 内的第一个订阅并且什么都不做,因为链已经完成。只有在那之后,由于 repeatWhen,它才会重新订阅,但您必须再次按下按钮才能触发 switchMap() 运算符。

  • 当你使用 repeatWhen() 时,它会在每次内部 Observable 发射时重新订阅,所以你希望它在 startClick$ 上发射但只发射一次。同时你不希望它完成所以你需要使用这样的东西:

    repeatWhen(notifier$ => notifier$.pipe(
      switchMap(() => startClick$.pipe(take(1))),
    )),
    

因此,为了避免所有我认为您可以使用 takeUntil(stopClick$) 完成链,然后立即使用 repeat() 重新订阅以重新开始。

merge(
  startClick$.pipe(mapTo(true)),
  pauseBtn$.pipe(mapTo(false))
)
  .pipe(
    switchMap(val => (val ? interval(10) : EMPTY)),
    mapTo(-1),
    scan((acc: number, curr: number) => acc + curr, startValue),
    takeWhile(val => val >= 0),
    startWith(startValue),
    takeUntil(stopClick$),
    repeat(),
  )
  .subscribe(val => {
    counterDisplayHeader.innerHTML = val.toString();
  });

您更新的演示:https://stackblitz.com/edit/rxjs-tum4xq?file=index.ts

您可以使用 takeUntilrepeatWhen 或其他运算符以另一种方式实现这一点,而无需完成主要可观察对象或重新订阅它,如下所示:

  • 创建一个简单的 state 来处理 counter 更改(countisTicking
  • merge 在一个可观察对象中影响计数器的所有可观察对象。
  • 创建中间 observable 以与主要 merge observable 交互(start/stop 计数)。
interface CounterStateModel {
  count: number;
  isTicking: boolean;
}

// Setup counter state
const initialCounterState: CounterStateModel = {
  count: startValue,
  isTicking: false
};

const patchCounterState = new Subject<Partial<CounterStateModel>>();
const counterCommands$ = merge(
  startClick$.pipe(mapTo({ isTicking: true })),
  pauseBtn$.pipe(mapTo({ isTicking: false })),
  stopClick$.pipe(mapTo({ ...initialCounterState })),
  patchCounterState.asObservable()
);

const counterState$: Observable<CounterStateModel> = counterCommands$.pipe(
  startWith(initialCounterState),
  scan(
    (counterState: CounterStateModel, command): CounterStateModel => ({
      ...counterState,
      ...command
    })
  ),
  shareReplay(1)
);

const isTicking$ = counterState$.pipe(
  map(state => state.isTicking),
  distinctUntilChanged()
);

const commandFromTick$ = isTicking$.pipe(
  switchMap(isTicking => (isTicking ? timer(0, 10) : NEVER)),
  withLatestFrom(counterState$, (_, counterState) => ({
    count: counterState.count
  })),
  tap(({ count }) => {
    if (count) {
      patchCounterState.next({ count: count - 1 });
    } else {
      patchCounterState.next({ ...initialCounterState });
    }
  })
);

const commandFromReset$ = stopClick$.pipe(mapTo({ ...initialCounterState }));

merge(commandFromTick$, commandFromReset$)
  .pipe(startWith(initialCounterState))
  .subscribe(
    state => (counterDisplayHeader.innerHTML = state.count.toString())
  );

这里还有工作版本: https://stackblitz.com/edit/rxjs-o86zg5