如何为 RxJS 计时器添加停止和启动功能?
How to add a stop and start feature for an RxJS timer?
我添加了开始、停止、暂停按钮。 Start 将启动一个倒计时计时器,该计时器将从一个值开始,一直递减直到值达到 0。我们可以通过单击暂停按钮暂停计时器。单击“停止”时,定时器观察也完成。
但是,一旦计时器完成(当值达到 0 或
单击停止按钮时),我无法正常启动。我
尝试添加 repeatWhen 运算符。它开始点击两次。不在
第一次。
此外,在停止时,值不会重置回初始值。
const subscription = merge(
startClick$.pipe(mapTo(true)),
pauseBtn$.pipe(mapTo(false))
)
.pipe(
tap(val => {
console.log(val);
}),
switchMap(val => (val ? interval(10).pipe(takeUntil(stopClick$)) : EMPTY)),
mapTo(-1),
scan((acc: number, curr: number) => acc + curr, startValue),
takeWhile(val => val >= 0),
repeatWhen(() => startClick$),
startWith(startValue)
)
.subscribe(val => {
counterDisplayHeader.innerHTML = val.toString();
});
Stackblitz 代码 link 可用 here
这是一个向上计数而不是向下计数的秒表示例。也许你可以重新加工它。
type StopwatchAction = "START" | "STOP" | "RESET" | "END";
function createStopwatch(
control$: Observable<StopwatchAction>,
interval = 1000
): Observable<number>{
return defer(() => {
let toggle: boolean = false;
let count: number = 0;
const ticker = timer(0, interval).pipe(
map(x => count++)
);
const end$ = of("END");
return concat(
control$,
end$
).pipe(
catchError(_ => end$),
switchMap(control => {
if(control === "START" && !toggle){
toggle = true;
return ticker;
}else if(control === "STOP" && toggle){
toggle = false;
return EMPTY;
}else if(control === "RESET"){
count = 0;
if(toggle){
return ticker;
}
}
return EMPTY;
})
);
});
}
这是一个使用中的例子:
const start$: Observable<StopwatchAction> = fromEvent(startBtn, 'click').pipe(mapTo("START"));
const reset$: Observable<StopwatchAction> = fromEvent(resetBtn, 'click').pipe(mapTo("RESET"));
createStopwatch(merge(start$,reset$)).subscribe(seconds => {
secondsField.innerHTML = seconds % 60;
minuitesField.innerHTML = Math.floor(seconds / 60) % 60;
hoursField.innerHTML = Math.floor(seconds / 3600);
});
这是一个相当复杂的用例。我认为有两个问题:
您对 startClick$
有两个订阅,在这种情况下订阅的顺序很重要。当链完成时,repeatWhen
正在等待 startClick$
发出。但是,当您单击该按钮时,发射首先传播到 merge(...)
内的第一个订阅并且什么都不做,因为链已经完成。只有在那之后,由于 repeatWhen
,它才会重新订阅,但您必须再次按下按钮才能触发 switchMap()
运算符。
当你使用 repeatWhen()
时,它会在每次内部 Observable 发射时重新订阅,所以你希望它在 startClick$
上发射但只发射一次。同时你不希望它完成所以你需要使用这样的东西:
repeatWhen(notifier$ => notifier$.pipe(
switchMap(() => startClick$.pipe(take(1))),
)),
因此,为了避免所有我认为您可以使用 takeUntil(stopClick$)
完成链,然后立即使用 repeat()
重新订阅以重新开始。
merge(
startClick$.pipe(mapTo(true)),
pauseBtn$.pipe(mapTo(false))
)
.pipe(
switchMap(val => (val ? interval(10) : EMPTY)),
mapTo(-1),
scan((acc: number, curr: number) => acc + curr, startValue),
takeWhile(val => val >= 0),
startWith(startValue),
takeUntil(stopClick$),
repeat(),
)
.subscribe(val => {
counterDisplayHeader.innerHTML = val.toString();
});
您更新的演示:https://stackblitz.com/edit/rxjs-tum4xq?file=index.ts
您可以使用 takeUntil
、repeatWhen
或其他运算符以另一种方式实现这一点,而无需完成主要可观察对象或重新订阅它,如下所示:
- 创建一个简单的
state
来处理 counter
更改(count
、isTicking
)
merge
在一个可观察对象中影响计数器的所有可观察对象。
- 创建中间 observable 以与主要
merge
observable 交互(start/stop 计数)。
interface CounterStateModel {
count: number;
isTicking: boolean;
}
// Setup counter state
const initialCounterState: CounterStateModel = {
count: startValue,
isTicking: false
};
const patchCounterState = new Subject<Partial<CounterStateModel>>();
const counterCommands$ = merge(
startClick$.pipe(mapTo({ isTicking: true })),
pauseBtn$.pipe(mapTo({ isTicking: false })),
stopClick$.pipe(mapTo({ ...initialCounterState })),
patchCounterState.asObservable()
);
const counterState$: Observable<CounterStateModel> = counterCommands$.pipe(
startWith(initialCounterState),
scan(
(counterState: CounterStateModel, command): CounterStateModel => ({
...counterState,
...command
})
),
shareReplay(1)
);
const isTicking$ = counterState$.pipe(
map(state => state.isTicking),
distinctUntilChanged()
);
const commandFromTick$ = isTicking$.pipe(
switchMap(isTicking => (isTicking ? timer(0, 10) : NEVER)),
withLatestFrom(counterState$, (_, counterState) => ({
count: counterState.count
})),
tap(({ count }) => {
if (count) {
patchCounterState.next({ count: count - 1 });
} else {
patchCounterState.next({ ...initialCounterState });
}
})
);
const commandFromReset$ = stopClick$.pipe(mapTo({ ...initialCounterState }));
merge(commandFromTick$, commandFromReset$)
.pipe(startWith(initialCounterState))
.subscribe(
state => (counterDisplayHeader.innerHTML = state.count.toString())
);
这里还有工作版本:
https://stackblitz.com/edit/rxjs-o86zg5
我添加了开始、停止、暂停按钮。 Start 将启动一个倒计时计时器,该计时器将从一个值开始,一直递减直到值达到 0。我们可以通过单击暂停按钮暂停计时器。单击“停止”时,定时器观察也完成。
但是,一旦计时器完成(当值达到 0 或 单击停止按钮时),我无法正常启动。我 尝试添加 repeatWhen 运算符。它开始点击两次。不在 第一次。
此外,在停止时,值不会重置回初始值。
const subscription = merge(
startClick$.pipe(mapTo(true)),
pauseBtn$.pipe(mapTo(false))
)
.pipe(
tap(val => {
console.log(val);
}),
switchMap(val => (val ? interval(10).pipe(takeUntil(stopClick$)) : EMPTY)),
mapTo(-1),
scan((acc: number, curr: number) => acc + curr, startValue),
takeWhile(val => val >= 0),
repeatWhen(() => startClick$),
startWith(startValue)
)
.subscribe(val => {
counterDisplayHeader.innerHTML = val.toString();
});
Stackblitz 代码 link 可用 here
这是一个向上计数而不是向下计数的秒表示例。也许你可以重新加工它。
type StopwatchAction = "START" | "STOP" | "RESET" | "END";
function createStopwatch(
control$: Observable<StopwatchAction>,
interval = 1000
): Observable<number>{
return defer(() => {
let toggle: boolean = false;
let count: number = 0;
const ticker = timer(0, interval).pipe(
map(x => count++)
);
const end$ = of("END");
return concat(
control$,
end$
).pipe(
catchError(_ => end$),
switchMap(control => {
if(control === "START" && !toggle){
toggle = true;
return ticker;
}else if(control === "STOP" && toggle){
toggle = false;
return EMPTY;
}else if(control === "RESET"){
count = 0;
if(toggle){
return ticker;
}
}
return EMPTY;
})
);
});
}
这是一个使用中的例子:
const start$: Observable<StopwatchAction> = fromEvent(startBtn, 'click').pipe(mapTo("START"));
const reset$: Observable<StopwatchAction> = fromEvent(resetBtn, 'click').pipe(mapTo("RESET"));
createStopwatch(merge(start$,reset$)).subscribe(seconds => {
secondsField.innerHTML = seconds % 60;
minuitesField.innerHTML = Math.floor(seconds / 60) % 60;
hoursField.innerHTML = Math.floor(seconds / 3600);
});
这是一个相当复杂的用例。我认为有两个问题:
您对
startClick$
有两个订阅,在这种情况下订阅的顺序很重要。当链完成时,repeatWhen
正在等待startClick$
发出。但是,当您单击该按钮时,发射首先传播到merge(...)
内的第一个订阅并且什么都不做,因为链已经完成。只有在那之后,由于repeatWhen
,它才会重新订阅,但您必须再次按下按钮才能触发switchMap()
运算符。当你使用
repeatWhen()
时,它会在每次内部 Observable 发射时重新订阅,所以你希望它在startClick$
上发射但只发射一次。同时你不希望它完成所以你需要使用这样的东西:repeatWhen(notifier$ => notifier$.pipe( switchMap(() => startClick$.pipe(take(1))), )),
因此,为了避免所有我认为您可以使用 takeUntil(stopClick$)
完成链,然后立即使用 repeat()
重新订阅以重新开始。
merge(
startClick$.pipe(mapTo(true)),
pauseBtn$.pipe(mapTo(false))
)
.pipe(
switchMap(val => (val ? interval(10) : EMPTY)),
mapTo(-1),
scan((acc: number, curr: number) => acc + curr, startValue),
takeWhile(val => val >= 0),
startWith(startValue),
takeUntil(stopClick$),
repeat(),
)
.subscribe(val => {
counterDisplayHeader.innerHTML = val.toString();
});
您更新的演示:https://stackblitz.com/edit/rxjs-tum4xq?file=index.ts
您可以使用 takeUntil
、repeatWhen
或其他运算符以另一种方式实现这一点,而无需完成主要可观察对象或重新订阅它,如下所示:
- 创建一个简单的
state
来处理counter
更改(count
、isTicking
) merge
在一个可观察对象中影响计数器的所有可观察对象。- 创建中间 observable 以与主要
merge
observable 交互(start/stop 计数)。
interface CounterStateModel {
count: number;
isTicking: boolean;
}
// Setup counter state
const initialCounterState: CounterStateModel = {
count: startValue,
isTicking: false
};
const patchCounterState = new Subject<Partial<CounterStateModel>>();
const counterCommands$ = merge(
startClick$.pipe(mapTo({ isTicking: true })),
pauseBtn$.pipe(mapTo({ isTicking: false })),
stopClick$.pipe(mapTo({ ...initialCounterState })),
patchCounterState.asObservable()
);
const counterState$: Observable<CounterStateModel> = counterCommands$.pipe(
startWith(initialCounterState),
scan(
(counterState: CounterStateModel, command): CounterStateModel => ({
...counterState,
...command
})
),
shareReplay(1)
);
const isTicking$ = counterState$.pipe(
map(state => state.isTicking),
distinctUntilChanged()
);
const commandFromTick$ = isTicking$.pipe(
switchMap(isTicking => (isTicking ? timer(0, 10) : NEVER)),
withLatestFrom(counterState$, (_, counterState) => ({
count: counterState.count
})),
tap(({ count }) => {
if (count) {
patchCounterState.next({ count: count - 1 });
} else {
patchCounterState.next({ ...initialCounterState });
}
})
);
const commandFromReset$ = stopClick$.pipe(mapTo({ ...initialCounterState }));
merge(commandFromTick$, commandFromReset$)
.pipe(startWith(initialCounterState))
.subscribe(
state => (counterDisplayHeader.innerHTML = state.count.toString())
);
这里还有工作版本: https://stackblitz.com/edit/rxjs-o86zg5