Rx distinctUntilChanged 允许在事件之间的可配置时间后重复
Rx distinctUntilChanged allow repetition after configurable time between events
让我们考虑一下下面的代码
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
.subscribe(x => console.log(x))
我们希望 1
只记录一次。然而,如果我们想要允许重复一个值,如果它的最后一次发射是在可配置的时间之前呢?我的意思是 记录两个事件。
例如,如果有类似下面的东西会很酷
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
.subscribe(x => console.log(x))
其中 distinctUntilChanged()
接受某种超时以允许重复下一个元素。然而,这样的事情并不存在,我想知道是否有人知道一种优雅的方法来实现这一点,即使用高级运算符而不用弄乱需要处理状态的过滤器
这很有趣 use-case。我想知道是否有比我更简单的解决方案(注意我使用的是 RxJS 5):
let timedDistinctUntil = Observable.defer(() => {
let innerObs = null;
let innerSubject = null;
let delaySub = null;
function tearDown() {
delaySub.unsubscribe();
innerSubject.complete();
}
return Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // ignored
Observable.of(1).delay(700), // ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), // ignored
Observable.of(2).delay(2300)
)
.do(undefined, undefined, () => tearDown())
.map(value => {
if (innerObs) {
innerSubject.next(value);
return null;
}
innerSubject = new BehaviorSubject(value);
delaySub = Observable.of(null).delay(1000).subscribe(() => {
innerObs = null;
});
innerObs = innerSubject.distinctUntilChanged();
return innerObs;
})
// filter out all skipped Observable emissions
.filter(observable => observable)
.switch();
});
timedDistinctUntil
.timestamp()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);
观看现场演示:https://jsbin.com/sivuxo/5/edit?js,console
整个逻辑被包装到 Observable.defer()
静态方法中,因为它需要一些额外的变量。
几点说明这一切是如何运作的:
merge()
是物品来源
我使用 do()
正确捕获源完成的时间,这样我就可以关闭内部计时器并发送正确的完成通知。
map()
运算符是最有趣的事情发生的地方。我重新发出它收到的值,然后 return null
如果已经有一个有效的 Observable(它是在不到 1000 毫秒之前创建的 = innerObs != null
)。然后我最终创建了一个新的主题,我将在其中重新发送所有项目和 return 这个 BehaviorSubject
与 .distinctUntilChanged()
链接。最后,我安排了 1 秒的延迟来设置 innerObs = null
,这意味着当另一个值到达时,它将 return 一个带有新 .distinctUntilChanged()
.
的新 Observable
然后 filter()
让我忽略所有 null
值 returned。这意味着它不会每秒多次发出新的 Observable。
现在我需要使用所谓的 Higher-order Observables(Observables emitting Observables。出于这个原因,我使用 switch()
运算符,它总是只订阅最新的 Observable来源。在我们的例子中,我们每秒最多只发射 Observables 一次(感谢上面使用的 filter()
)并且这个内部 Observable 本身可以发射它想要的任意数量的值,并且所有这些值都将通过 distinctUntilChanged()
所以忽略重复项。
此演示的输出类似于以下输出:
Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
complete
如您所见,值 1
以 cca 2s 延迟发出两次。然而,由于 distinctUntilChanged()
.
,值 2
在 100 毫秒后毫无问题地通过了
我知道这并不简单,但我希望它对你有意义:)
除非我误解了,否则我很确定这可以通过 windowTime
:
以相对 straight-forward 的方式完成
Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // Ignored
Observable.of(1).delay(700), // Ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), //Ignored
Observable.of(2).delay(2300)
)
// Converts the stream into a stream of streams each 1000 milliseconds long
.windowTime(1000)
// Flatten each of the streams and emit only the latest (there should only be one active
// at a time anyway
// We apply the distinctUntilChanged to the windows before flattening
.switchMap(source => source.distinctUntilChanged())
.timeInterval()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);
查看示例here(借用@martin 的示例输入)
让我们考虑一下下面的代码
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
.subscribe(x => console.log(x))
我们希望 1
只记录一次。然而,如果我们想要允许重复一个值,如果它的最后一次发射是在可配置的时间之前呢?我的意思是 记录两个事件。
例如,如果有类似下面的东西会很酷
Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
.subscribe(x => console.log(x))
其中 distinctUntilChanged()
接受某种超时以允许重复下一个元素。然而,这样的事情并不存在,我想知道是否有人知道一种优雅的方法来实现这一点,即使用高级运算符而不用弄乱需要处理状态的过滤器
这很有趣 use-case。我想知道是否有比我更简单的解决方案(注意我使用的是 RxJS 5):
let timedDistinctUntil = Observable.defer(() => {
let innerObs = null;
let innerSubject = null;
let delaySub = null;
function tearDown() {
delaySub.unsubscribe();
innerSubject.complete();
}
return Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // ignored
Observable.of(1).delay(700), // ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), // ignored
Observable.of(2).delay(2300)
)
.do(undefined, undefined, () => tearDown())
.map(value => {
if (innerObs) {
innerSubject.next(value);
return null;
}
innerSubject = new BehaviorSubject(value);
delaySub = Observable.of(null).delay(1000).subscribe(() => {
innerObs = null;
});
innerObs = innerSubject.distinctUntilChanged();
return innerObs;
})
// filter out all skipped Observable emissions
.filter(observable => observable)
.switch();
});
timedDistinctUntil
.timestamp()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);
观看现场演示:https://jsbin.com/sivuxo/5/edit?js,console
整个逻辑被包装到 Observable.defer()
静态方法中,因为它需要一些额外的变量。
几点说明这一切是如何运作的:
merge()
是物品来源我使用
do()
正确捕获源完成的时间,这样我就可以关闭内部计时器并发送正确的完成通知。map()
运算符是最有趣的事情发生的地方。我重新发出它收到的值,然后 returnnull
如果已经有一个有效的 Observable(它是在不到 1000 毫秒之前创建的 =innerObs != null
)。然后我最终创建了一个新的主题,我将在其中重新发送所有项目和 return 这个BehaviorSubject
与.distinctUntilChanged()
链接。最后,我安排了 1 秒的延迟来设置innerObs = null
,这意味着当另一个值到达时,它将 return 一个带有新.distinctUntilChanged()
. 的新 Observable
然后
filter()
让我忽略所有null
值 returned。这意味着它不会每秒多次发出新的 Observable。现在我需要使用所谓的 Higher-order Observables(Observables emitting Observables。出于这个原因,我使用
switch()
运算符,它总是只订阅最新的 Observable来源。在我们的例子中,我们每秒最多只发射 Observables 一次(感谢上面使用的filter()
)并且这个内部 Observable 本身可以发射它想要的任意数量的值,并且所有这些值都将通过distinctUntilChanged()
所以忽略重复项。
此演示的输出类似于以下输出:
Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
complete
如您所见,值 1
以 cca 2s 延迟发出两次。然而,由于 distinctUntilChanged()
.
2
在 100 毫秒后毫无问题地通过了
我知道这并不简单,但我希望它对你有意义:)
除非我误解了,否则我很确定这可以通过 windowTime
:
Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // Ignored
Observable.of(1).delay(700), // Ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), //Ignored
Observable.of(2).delay(2300)
)
// Converts the stream into a stream of streams each 1000 milliseconds long
.windowTime(1000)
// Flatten each of the streams and emit only the latest (there should only be one active
// at a time anyway
// We apply the distinctUntilChanged to the windows before flattening
.switchMap(source => source.distinctUntilChanged())
.timeInterval()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);
查看示例here(借用@martin 的示例输入)