RXJS - 仅在空闲时启动计时器?

RXJS - start a timer only when idle?

我使用的流在滚动 window 时受到限制。
在节流时(只要滚动),它会向控制台发出值。

但是,当流空闲时(用户 没有 滚动 window) - 我想要一个计时器启动。但是 - 如果用户再次开始滚动- 我不希望那个计时器发出值。

目前我正在这样做:

  const observable = Rx.Observable.fromEvent(window, 'scroll');

  const subscriber = observable
      .throttleTime(300 )
      .map(() => 'throttle')
      .merge(Rx.Observable.interval(1000).map(() => 'tick') )
      .subscribe(
          (x) => {
            console.log('Next: event!', x);
          },
          (err) => {
            console.log('Error: %s', err);
          },
          () => {
            console.log('Completed');
          });

问题是,在滚动时 - 我同时看到 "throttle""tick"(我应该只看到 "throttle")

从另一个视角思考这个问题。工作总是必须 运行。如果我滚动 - 那个受限的滚动 - 应该调用该作业。如果我不滚动 - 计时器应该启动并开始完成工作。 (如果用户再次开始滚动则停止)。

问题:
如何在不滚动的空闲时间后启动计时器?

PLNKR

我会这样做:

const scroll$ = Rx.Observable.fromEvent(window, 'scroll')
    .throttleTime(300 /* ms */)
    .publish();

scroll$.connect();

const subscriber = scroll$
    .map(() => 'throttle')
    .race(Rx.Observable.interval(1000).map(() => 'tick'))
    .take(1)
    .repeat()
    .subscribe(
        (x) => {
          console.log('Next: event!', x);
        },
        (err) => {
          console.log('Error: %s', err);
        },
        () => {
          console.log('Completed');
        });

这使用 race() 运算符仅订阅首先发出的 Observable,即 1s interval 或滚动事件。在那之后我想用另一个间隔再次开始,所以我使用 take(1).repeat().

我还必须将 scroll$ Observable 变成热 Observable,以在重复订阅中保留 throttleTime() 运行。

您更新的演示:https://plnkr.co/edit/sWzSm32uoOQ1hOKigo4s?p=preview

您可以使用 debounceTime 来检测经期而无需滚动。

const scroll = Rx.Observable.fromEvent(window, 'scroll')
  .throttleTime(300)
  .mapTo(false);
const noscroll = Rx.Observable.fromEvent(window, 'scroll')
  .startWith(0) // init with no scroll.
  .debounceTime(300) // detect no scroll after 300 ms.
  .mapTo(true);
scroll.merge(noscroll)
  .switchMap(e => e ? Rx.Observable.interval(1000).mapTo("Tick!") : Rx.Observable.of("Scroll!"))  
  // start the interval if there was no scroll. Stop the interval if there was a scroll.
  .subscribe(updateTimer)

您的代码的另一个问题是使用 merge that will keep both sources subscribed, instead i use switchMap (a sibling of mergeMap),它会在每次发出新事件时订阅内部可观察对象,但如果从源发出另一个事件,也会取消订阅之前的内部源。

回复:问题的"another POV"部分:您可以将switchMap中的Rx.Observable.interval(1000)替换为作业。滚动将 cancel/unsubscribe 作业(因为 empty 已发出),如果不再滚动,作业将重新开始。

Live demo