播放和暂停间隔 rxjs

Play and Pause interval rxjs

我正在尝试使用 Rxjs 库实现播放和暂停按钮。

const play$ = fromEvent(playerImplementation, PLAYER_EVENTS.PLAY).pipe(mapTo(true));
const pause$ = fromEvent(playerImplementation, PLAYER_EVENTS.PAUSE).pipe(mapTo(false));
const waiting$ = fromEvent(playerImplementation, PLAYER_EVENTS.WAITING).pipe(mapTo(false));

let counterTime = 0;
const currentTime$ = interval(30).pipe(
 map(()=>counterTime += 30));

const player$ = merge(play$, pause$, waiting$).pipe(
        switchMap(value => (value ? currentTime$ : EMPTY)));

// DIFFERENCE IN RESULTS
currentTime$.subscribe((v)=> console.log("Regular Count " + v)); // get correctly 30,60,90,120...
player$.subscribe((v)=>console.log("Condition Count" + v)); // get wrongly 30,150,270, 390

任何人都可以帮助理解为什么结果之间存在差异吗?

发生这种情况是因为我为一个可观察对象(player$ 可观察对象)使用了多个订阅者。我通过使用 ReplaySubject 而不是 Observable 并使用 multicasting 来解决这个问题,以便处理事件几个订阅者,而不改变价值。

const play$ = fromEvent(playerImplementation, PLAYER_EVENTS.PLAY).pipe(mapTo(true));
const pause$ = fromEvent(playerImplementation, PLAYER_EVENTS.PAUSE).pipe(mapTo(false));
const waiting$ = fromEvent(playerImplementation, PLAYER_EVENTS.WAITING).pipe(mapTo(false));

  let timeCounter = 0;
  const source = Observable.create((obs : Observer<number>)=> {
          interval(30).pipe(
             map(() => timeCounter += 30)).subscribe(obs);
          return () => {};
        });

        // Cast the observable to subject for distributing to several subscribers
        const currentTime$ = source.pipe(multicast(()=> new ReplaySubject(5))).refCount();

  const player$ = merge(play$, pause$, waiting$).pipe(
        switchMap(value => value ? currentTime$ : EMPTY));