具有反应性扩展的阈值检测
Threshold detection with reactive extensions
我在反应式扩展上花了一些时间后重写了这个问题 - 我可能知道现在非常危险所以请耐心等待并提前致谢。
我有以下检测级别的代码:
static class LevelExtensions
{
public static IEnumerable<double> GetCrossovers(this double[] self, double x1, double x2)
{
return from level in self
where level >= x1 && level <= x2 || level <= x1 && level >= x2
select level;
}
public static IObservable<ThresholdCrossedEvent> ThresholdDetection(this IObservable<double> source, double[] thresholds)
{
return source
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.ToList().Count > 0)
.SelectMany(x => from level in x.LevelsCrossed select new ThresholdCrossedEvent(level, x.Previous, x.Current));
}
}
public class ThresholdCrossedEvent
{
public ThresholdCrossedEvent(double level, double previous, double current)
{
Threshold = level;
Previous = previous;
Current = current;
}
public double Threshold { get; set; }
public double Previous { get; set; }
public double Current { get; set; }
public Direction SlopeDirection => Current >= Previous ? Direction.Up : Direction.Down;
public bool IsTouching => Current == Threshold || Previous == Threshold;
}
var feed = new double[] { 1, 2, 3, 5, 5.5, 6, 9, 12, 10, 9, 7.5, 6.5, 7 }.ToObservable();
var levels = new double[] { 5, 7, 8 };
feed
.ThresholdDetection(levels)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
并且它按预期生成事件,到目前为止还可以:
{"Threshold":5.0,"Previous":3.0,"Current":5.0,"IsUpward":true,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"IsUpward":true,"IsTouching":true}
{"Threshold":7.0,"Previous":6.0,"Current":9.0,"IsUpward":true,"IsTouching":false}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"IsUpward":true,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"IsUpward":false,"IsTouching":false}
{"Threshold":7.0,"Previous":7.5,"Current":6.5,"IsUpward":false,"IsTouching":false}
{"Threshold":7.0,"Previous":6.5,"Current":7.0,"IsUpward":true,"IsTouching":true}
性能有异味吗?
在没有过早优化的情况下 - 是否还有其他任何尖叫性能问题(因为我将每秒处理 40,000 多条消息)?
我注意到我的 GetLevelsBetweenPair
水平交叉检测将依赖于 Linq.Where
每秒更新数千次,我的 levels
序列应该从低到高排序 - Linq
为我优化或我应该考虑的任何事情?
我将如何处理要在运行时更新的 levels
列表?
我在我的 feed
列表上调用 OnNext
从消息总线发布值,然后通过用户 interface/command 我 add/remove [=15] 上的一个元素=] 列表(它需要在修改后自动重新排序)。
鉴于不断检查这将是一个竞争激烈的数组(但根本不会经常更新),此级别列表更新过程应考虑哪些事项?
哪种结构允许以最有效的方式进行门控访问?
具有不同的 levels
列表并对其调用相同的订阅
我会有不同的级别“类别”,每个级别都创建相同的 ThresholdCrossedEvent,但意味着不同的东西,即 HistoricalThreshold 或 WeeklyThreshold。
作为多个订阅执行以下操作是否可以(最好),或者我应该将此阈值类型数据编码到警报中并将其从 double[] 更改为包含必要条件的 class 对象字段?
feed
.ThresholdDetection(historicalThresholds)
.Subscribe(x => Console.WriteLine("Historical " + JsonConvert.SerializeObject(x)));
feed
.ThresholdDetection(weeklyThresholds)
.Subscribe(x => Console.WriteLine("Weekly: " + JsonConvert.SerializeObject(x)));
您的实施看起来不错。几件事:
- 您还想将不断变化的
levels
建模为可观察对象。
- 我不是最好的 Linq2Objects 专家,但我认为 list/array 强制转换是不必要的并且对性能有轻微影响(如果您担心的话)。显然是测试。
- 我讨厌 Linq 语法,所以我在下面的示例中删除了它。我认为这不重要。
解决方案如下所示:
static class LevelExtensions
{
public static IEnumerable<double> GetCrossovers(this IEnumerable<double> self, double x1, double x2)
{
return self
.Where(level => level >= x1 && level <= x2 || level <= x1 && level >= x2);
}
public static IObservable<ThresholdCrossedEvent> ThresholdDetection(this IObservable<double> source, IObservable<double[]> thresholdSource)
{
return thresholdSource
.Select(thresholds => source
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.Count() > 0)
.SelectMany(x => x.LevelsCrossed.Select(lc => new ThresholdCrossedEvent(lc, x.Previous, x.Current)))
)
.Switch();
}
}
这是对您的代码的直接改编:唯一的区别是外部 Select
、Switch
和输入。对于每个 levels
通知,新的 Select
创建一个新的 Observable。 Switch
表示总是切换到最新的,并丢弃旧的。因此,您始终使用最新的 levels
阈值。
我用 运行 测试了下面的代码:
var feed = new double[] { 1, 2, 3, 5, 5.5, 6, 9, 12, 10, 9, 7.5, 6.5, 7 };
var levels = new double[] { 5, 7, 8 };
var feedSubject = new Subject<double>();
var levelsSubject = new BehaviorSubject<double[]>(levels);
feedSubject
.ThresholdDetection(levelsSubject)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
foreach (var d in feed)
feedSubject.OnNext(d);
levelsSubject.OnNext(new double[] { 5, 8 });
Console.WriteLine("---");
foreach (var d in feed)
feedSubject.OnNext(d);
它 运行 使用您的原始阈值通过该馈送阵列一次,然后我 运行 通过它并删除 7 个阈值。我的输出是这样的:
{"Threshold":5.0,"Previous":3.0,"Current":5.0,"SlopeDirection":0,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"SlopeDirection":0,"IsTouching":true}
{"Threshold":7.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"SlopeDirection":1,"IsTouching":false}
{"Threshold":7.0,"Previous":7.5,"Current":6.5,"SlopeDirection":1,"IsTouching":false}
{"Threshold":7.0,"Previous":6.5,"Current":7.0,"SlopeDirection":0,"IsTouching":true}
---
{"Threshold":5.0,"Previous":3.0,"Current":5.0,"SlopeDirection":0,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"SlopeDirection":0,"IsTouching":true}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"SlopeDirection":1,"IsTouching":false}
我在反应式扩展上花了一些时间后重写了这个问题 - 我可能知道现在非常危险所以请耐心等待并提前致谢。
我有以下检测级别的代码:
static class LevelExtensions
{
public static IEnumerable<double> GetCrossovers(this double[] self, double x1, double x2)
{
return from level in self
where level >= x1 && level <= x2 || level <= x1 && level >= x2
select level;
}
public static IObservable<ThresholdCrossedEvent> ThresholdDetection(this IObservable<double> source, double[] thresholds)
{
return source
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.ToList().Count > 0)
.SelectMany(x => from level in x.LevelsCrossed select new ThresholdCrossedEvent(level, x.Previous, x.Current));
}
}
public class ThresholdCrossedEvent
{
public ThresholdCrossedEvent(double level, double previous, double current)
{
Threshold = level;
Previous = previous;
Current = current;
}
public double Threshold { get; set; }
public double Previous { get; set; }
public double Current { get; set; }
public Direction SlopeDirection => Current >= Previous ? Direction.Up : Direction.Down;
public bool IsTouching => Current == Threshold || Previous == Threshold;
}
var feed = new double[] { 1, 2, 3, 5, 5.5, 6, 9, 12, 10, 9, 7.5, 6.5, 7 }.ToObservable();
var levels = new double[] { 5, 7, 8 };
feed
.ThresholdDetection(levels)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
并且它按预期生成事件,到目前为止还可以:
{"Threshold":5.0,"Previous":3.0,"Current":5.0,"IsUpward":true,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"IsUpward":true,"IsTouching":true}
{"Threshold":7.0,"Previous":6.0,"Current":9.0,"IsUpward":true,"IsTouching":false}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"IsUpward":true,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"IsUpward":false,"IsTouching":false}
{"Threshold":7.0,"Previous":7.5,"Current":6.5,"IsUpward":false,"IsTouching":false}
{"Threshold":7.0,"Previous":6.5,"Current":7.0,"IsUpward":true,"IsTouching":true}
性能有异味吗?
在没有过早优化的情况下 - 是否还有其他任何尖叫性能问题(因为我将每秒处理 40,000 多条消息)?
我注意到我的 GetLevelsBetweenPair
水平交叉检测将依赖于 Linq.Where
每秒更新数千次,我的 levels
序列应该从低到高排序 - Linq
为我优化或我应该考虑的任何事情?
我将如何处理要在运行时更新的 levels
列表?
我在我的 feed
列表上调用 OnNext
从消息总线发布值,然后通过用户 interface/command 我 add/remove [=15] 上的一个元素=] 列表(它需要在修改后自动重新排序)。
鉴于不断检查这将是一个竞争激烈的数组(但根本不会经常更新),此级别列表更新过程应考虑哪些事项?
哪种结构允许以最有效的方式进行门控访问?
具有不同的 levels
列表并对其调用相同的订阅
我会有不同的级别“类别”,每个级别都创建相同的 ThresholdCrossedEvent,但意味着不同的东西,即 HistoricalThreshold 或 WeeklyThreshold。
作为多个订阅执行以下操作是否可以(最好),或者我应该将此阈值类型数据编码到警报中并将其从 double[] 更改为包含必要条件的 class 对象字段?
feed
.ThresholdDetection(historicalThresholds)
.Subscribe(x => Console.WriteLine("Historical " + JsonConvert.SerializeObject(x)));
feed
.ThresholdDetection(weeklyThresholds)
.Subscribe(x => Console.WriteLine("Weekly: " + JsonConvert.SerializeObject(x)));
您的实施看起来不错。几件事:
- 您还想将不断变化的
levels
建模为可观察对象。 - 我不是最好的 Linq2Objects 专家,但我认为 list/array 强制转换是不必要的并且对性能有轻微影响(如果您担心的话)。显然是测试。
- 我讨厌 Linq 语法,所以我在下面的示例中删除了它。我认为这不重要。
解决方案如下所示:
static class LevelExtensions
{
public static IEnumerable<double> GetCrossovers(this IEnumerable<double> self, double x1, double x2)
{
return self
.Where(level => level >= x1 && level <= x2 || level <= x1 && level >= x2);
}
public static IObservable<ThresholdCrossedEvent> ThresholdDetection(this IObservable<double> source, IObservable<double[]> thresholdSource)
{
return thresholdSource
.Select(thresholds => source
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.Count() > 0)
.SelectMany(x => x.LevelsCrossed.Select(lc => new ThresholdCrossedEvent(lc, x.Previous, x.Current)))
)
.Switch();
}
}
这是对您的代码的直接改编:唯一的区别是外部 Select
、Switch
和输入。对于每个 levels
通知,新的 Select
创建一个新的 Observable。 Switch
表示总是切换到最新的,并丢弃旧的。因此,您始终使用最新的 levels
阈值。
我用 运行 测试了下面的代码:
var feed = new double[] { 1, 2, 3, 5, 5.5, 6, 9, 12, 10, 9, 7.5, 6.5, 7 };
var levels = new double[] { 5, 7, 8 };
var feedSubject = new Subject<double>();
var levelsSubject = new BehaviorSubject<double[]>(levels);
feedSubject
.ThresholdDetection(levelsSubject)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
foreach (var d in feed)
feedSubject.OnNext(d);
levelsSubject.OnNext(new double[] { 5, 8 });
Console.WriteLine("---");
foreach (var d in feed)
feedSubject.OnNext(d);
它 运行 使用您的原始阈值通过该馈送阵列一次,然后我 运行 通过它并删除 7 个阈值。我的输出是这样的:
{"Threshold":5.0,"Previous":3.0,"Current":5.0,"SlopeDirection":0,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"SlopeDirection":0,"IsTouching":true}
{"Threshold":7.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"SlopeDirection":1,"IsTouching":false}
{"Threshold":7.0,"Previous":7.5,"Current":6.5,"SlopeDirection":1,"IsTouching":false}
{"Threshold":7.0,"Previous":6.5,"Current":7.0,"SlopeDirection":0,"IsTouching":true}
---
{"Threshold":5.0,"Previous":3.0,"Current":5.0,"SlopeDirection":0,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"SlopeDirection":0,"IsTouching":true}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"SlopeDirection":1,"IsTouching":false}