使用 rxjs 按阈值展平数值序列
Flatten sequence of numeric values by threshold with rxjs
使用 rxjs,我得到了一个可观察的浮点数序列。现在我想过滤掉流中较小的变化,并且只在它比以前发出的值大一定数量时发出一个值。
换句话说:总是发出序列中的第一个值。然后,每个发出的(=未过滤的)值应至少比先前 发出的 值大 delta。任何不符合该条件的值都会被过滤掉。
我已经想出了一个解决方案,可以满足我的上述要求:
var obs = Rx.Observable.create(function(observer) {
/* ... */
});
var last;
obs.map(function(value) {
if (last === undefined) {
last = value;
return value;
} else {
var threshold = 0.5,
delta = Math.abs(last - value);
if (delta > threshold) {
last = value;
return value;
}
else {
return undefined;
}
}
}).
filter(function(value) {
return value !== undefined;
});
我是 rxjs 和响应式编程的新手,我认为上述解决方案过于复杂。更重要的是,它违反了反应式编程的原则,即不要在组合管道外保存状态。但我这样做了,因为我通过 last
变量跟踪并想摆脱它。
我该如何解决这个问题并以反应方式进行?
您可以使用scan
来管理您的状态:
var filtered = obs.scan({}, function (acc, value) {
if (acc.value !== undefined) {
var threshold = 0.5,
change = Math.abs(acc.value - value);
if (change < threshold) {
return { value: acc.value, isValid: false };
}
}
return { value: value, isValid: true };
})
.filter(function (acc) { return acc.isValid; })
.map(function (acc) { return acc.value; });
我知道已经回答了这个问题,但如果您发现自己经常这样做,您可以自己动手。只是随地吐痰,因为它是一个有趣的实用程序:
Observable.prototype.maxThreshold = function(threshold, selector) {
var last = null;
return this.filter(function(x) {
var n = selector ? selector(x) : x;
var delta = Math.abs(n - last);
if(last === null || delta > threshold) {
last = n;
return true;
}
return false;
});
});
可以这样使用:
streamOfNumbers.maxThreshold(0.5).
subscribe(function(x) {
console.log(x);
});
或
streamOfObjects.maxThreshold(0.5, function(x) { return x.value; }).
subscribe(function(x) {
console.log(x);
});
使用 rxjs,我得到了一个可观察的浮点数序列。现在我想过滤掉流中较小的变化,并且只在它比以前发出的值大一定数量时发出一个值。
换句话说:总是发出序列中的第一个值。然后,每个发出的(=未过滤的)值应至少比先前 发出的 值大 delta。任何不符合该条件的值都会被过滤掉。
我已经想出了一个解决方案,可以满足我的上述要求:
var obs = Rx.Observable.create(function(observer) {
/* ... */
});
var last;
obs.map(function(value) {
if (last === undefined) {
last = value;
return value;
} else {
var threshold = 0.5,
delta = Math.abs(last - value);
if (delta > threshold) {
last = value;
return value;
}
else {
return undefined;
}
}
}).
filter(function(value) {
return value !== undefined;
});
我是 rxjs 和响应式编程的新手,我认为上述解决方案过于复杂。更重要的是,它违反了反应式编程的原则,即不要在组合管道外保存状态。但我这样做了,因为我通过 last
变量跟踪并想摆脱它。
我该如何解决这个问题并以反应方式进行?
您可以使用scan
来管理您的状态:
var filtered = obs.scan({}, function (acc, value) {
if (acc.value !== undefined) {
var threshold = 0.5,
change = Math.abs(acc.value - value);
if (change < threshold) {
return { value: acc.value, isValid: false };
}
}
return { value: value, isValid: true };
})
.filter(function (acc) { return acc.isValid; })
.map(function (acc) { return acc.value; });
我知道已经回答了这个问题,但如果您发现自己经常这样做,您可以自己动手。只是随地吐痰,因为它是一个有趣的实用程序:
Observable.prototype.maxThreshold = function(threshold, selector) {
var last = null;
return this.filter(function(x) {
var n = selector ? selector(x) : x;
var delta = Math.abs(n - last);
if(last === null || delta > threshold) {
last = n;
return true;
}
return false;
});
});
可以这样使用:
streamOfNumbers.maxThreshold(0.5).
subscribe(function(x) {
console.log(x);
});
或
streamOfObjects.maxThreshold(0.5, function(x) { return x.value; }).
subscribe(function(x) {
console.log(x);
});