Reactive Extensions 正常运动的变化检测率

Reactive Extensions rate of change detection from normal movement

给定一系列随时间推移呈趋势的数字,我想使用 Reactive Extensions 在出现突然的绝对变化峰值或下降时发出警报。即 101.2, 102.4, 101.4, 100.9, 95, 93, 85... 然后慢慢增加回 100。

警报会在从 1​​00.9 下降到 95 时触发,每个警报都会有一个时间戳来寻找以下形式的警报:

大变化 时间戳 距离 百分比

我认为我需要从 Buffer(60, 1) 开始,以获得 60 个样本的移动平均线(样本之间的频率为一分钟)。

虽然这会给出平均值,但我不能指定任意百分比来触发警报,因为这可能因信号而异 - 一个可能比另一个具有更大的波动性。

为了获得波动率,我会采用更长的历史时间框架 Buffer(14, 1)(这些是同一信号的 14 天日均值)。

然后我会计算缓冲区中的每个值与 14 天平均值之间的差异,平方并添加所有这些偏差,然后除以样本数。

我的问题是:

  1. 我将如何执行上述波动率计算,还是最好在 RX 之外执行此操作并在可观察流计算之外每天更新一次新的波动率值(这可能更有意义以避免我不得不 运行 14 天的 1 分钟样本)?

  2. 我们如何结合快速移动平均线和波动水平(每天更新一次)来发出警报?我在 SO 的帖子上看到了 ScanDistinctUntilChanged,但不知道如何组合。

我会首先将其分解为几个步骤。 (为简单起见,我假设原始数据源是一个名为 values 的可观察对象。)

  1. values 转换为可观察的移动平均线(我们将在此称为 averages)。
  2. valuesaverages 组合成一个可以监视“极端”的可观察对象。

对于第 1 步,您可以使用 Slugart 在评论中提到的内置 Window 方法或类似的 Buffer 方法。 WindowBuffer 之后的 Select 调用可用于将数组处理为单个平均值对象。类似于:

averages = values.Buffer(60, 1)
                 .Select((buffer) => { /* do average and std dev calcuation here */ });

如果您需要滑动 windows,您可能必须实现自己的运算符,但我很容易不知道是否存在这样的运算符。 Scan 如果您需要编写这样的操作符,那么连同队列似乎是一个很好的基础。

对于第 2 步,您可能希望从 CombineLatest 开始,然后是 Where 子句。类似于:

extremes = values.CombineLatest(averages, (v, a) => new { Current = v, Average = a })
           .Where((value) = { /* check if value.Current is out of deviation from value.Average */ });

这种方法的好处在于,您可以选择像我们在这里所做的那样直接从线性值计算平均值,或者选择对其余代码影响最小的波动率信息的其他来源。

请注意,CombineLatest 调用可能导致对值的两种订阅,一种是直接订阅,另一种是通过订阅平均值间接订阅。如果值的底层实现不受欢迎,请使用 Publish 和 RefCount 来解决这个问题。

另请注意,每次 values 或 averages 输出一个值时,CombineLatest 都会输出一个值。这意味着每次平均值更新时您将获得两个事件,一个用于值更新,一个用于由值触发的平均值更新。

如果您使用的是滑动 windows,这将意味着对每个值进行双重更新,最好只在 Scan 输出中包含当前值并完全跳过 CombineLatest。你会得到这样的东西:

averages = values.Scan((v) => { /* build sliding window and attach current value */ });
extremes = averages.Where((a) => { /* check if current value is out of deviation for the window */ });

拥有 extremes 后,您可以订阅它并触发您的提醒。