Rx distinctUntilChanged 允许在事件之间的可配置时间后重复

Rx distinctUntilChanged allow repetition after configurable time between events

让我们考虑一下下面的代码

Rx.Observable.merge(
  Rx.Observable.just(1),
  Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
  .subscribe(x => console.log(x))

我们希望 1 只记录一次。然而,如果我们想要允许重复一个值,如果它的最后一次发射是在可配置的时间之前呢?我的意思是 记录两个事件

例如,如果有类似下面的东西会很酷

Rx.Observable.merge(
  Rx.Observable.just(1),
  Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
  .subscribe(x => console.log(x))

其中 distinctUntilChanged() 接受某种超时以允许重复下一个元素。然而,这样的事情并不存在,我想知道是否有人知道一种优雅的方法来实现这一点,即使用高级运算符而不用弄乱需要处理状态的过滤器

这很有趣 use-case。我想知道是否有比我更简单的解决方案(注意我使用的是 RxJS 5):

let timedDistinctUntil = Observable.defer(() => {
    let innerObs = null;
    let innerSubject = null;
    let delaySub = null;

    function tearDown() {
        delaySub.unsubscribe();
        innerSubject.complete();
    }

    return Observable
        .merge(
            Observable.of(1),
            Observable.of(1).delay(250),  // ignored
            Observable.of(1).delay(700),  // ignored
            Observable.of(1).delay(2000),
            Observable.of(1).delay(2200), // ignored
            Observable.of(2).delay(2300)
        )
        .do(undefined, undefined, () => tearDown())
        .map(value => {
            if (innerObs) {
                innerSubject.next(value);
                return null;
            }

            innerSubject = new BehaviorSubject(value);

            delaySub = Observable.of(null).delay(1000).subscribe(() => {
                innerObs = null;
            });

            innerObs = innerSubject.distinctUntilChanged();
            return innerObs;
        })
        // filter out all skipped Observable emissions
        .filter(observable => observable)
        .switch();
});

timedDistinctUntil
    .timestamp()
    .subscribe(
        value => console.log(value),
        error => console.log('error: ' + error),
        () => console.log('complete')
    );

观看现场演示:https://jsbin.com/sivuxo/5/edit?js,console

整个逻辑被包装到 Observable.defer() 静态方法中,因为它需要一些额外的变量。

几点说明这一切是如何运作的:

  1. merge()是物品来源

  2. 我使用 do() 正确捕获源完成的时间,这样我就可以关闭内部计时器并发送正确的完成通知。

  3. map() 运算符是最有趣的事情发生的地方。我重新发出它收到的值,然后 return null 如果已经有一个有效的 Observable(它是在不到 1000 毫秒之前创建的 = innerObs != null)。然后我最终创建了一个新的主题,我将在其中重新发送所有项目和 return 这个 BehaviorSubject.distinctUntilChanged() 链接。最后,我安排了 1 秒的延迟来设置 innerObs = null,这意味着当另一个值到达时,它将 return 一个带有新 .distinctUntilChanged().

  4. 的新 Observable
  5. 然后 filter() 让我忽略所有 null 值 returned。这意味着它不会每秒多次发出新的 Observable。

  6. 现在我需要使用所谓的 Higher-order Observables(Observables emitting Observables。出于这个原因,我使用 switch() 运算符,它总是只订阅最新的 Observable来源。在我们的例子中,我们每秒最多只发射 Observables 一次(感谢上面使用的 filter())并且这个内部 Observable 本身可以发射它想要的任意数量的值,并且所有这些值都将通过 distinctUntilChanged() 所以忽略重复项。

此演示的输出类似于以下输出:

Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
complete

如您所见,值 1 以 cca 2s 延迟发出两次。然而,由于 distinctUntilChanged().

,值 2 在 100 毫秒后毫无问题地通过了

我知道这并不简单,但我希望它对你有意义:)

除非我误解了,否则我很确定这可以通过 windowTime:

以相对 straight-forward 的方式完成
Observable
  .merge(
   Observable.of(1),
   Observable.of(1).delay(250), // Ignored
   Observable.of(1).delay(700), // Ignored
   Observable.of(1).delay(2000),
   Observable.of(1).delay(2200), //Ignored
   Observable.of(2).delay(2300)
  )
  // Converts the stream into a stream of streams each 1000 milliseconds long
  .windowTime(1000)
  // Flatten each of the streams and emit only the latest (there should only be one active 
  // at a time anyway
  // We apply the distinctUntilChanged to the windows before flattening
  .switchMap(source => source.distinctUntilChanged())  
  .timeInterval()
  .subscribe(
    value => console.log(value),
    error => console.log('error: ' + error),
    () => console.log('complete')
  );

查看示例here(借用@martin 的示例输入)