如何长时间监控 IObservable<double> 是否超出范围

How to monitor IObservable<double> for Out of Range for Extended Period of Time

我有一个 IObservable<double> 以某种固定的间隔提供从传感器读取的值。我想在传感器值长时间超出范围时发出信号。

举个具体的例子,假设可观察对象是一个温度传感器。我想监控温度何时超过 100°C 并持续 5 秒。也就是说,从 IObservable<double> 产生一个观察值或事件,当温度超过 100°C 5 秒时触发一次。如果任何数量的样本的温度读数高于 100°C 的时间少于 5 秒,则这没有影响。在一组样品中的第一个样品全部超过 100°C 5 秒后,它应该发出信号。如果该值继续高于 100°C,则不应重新发出信号,直到温度降至 100°C 以下,然后再次超过它 5 秒。

似乎这对于响应式扩展来说应该很简单,但作为新手,我找不到任何东西。我浏览了 http://reactivex.io and http://introtorx.com 但没有找到任何东西。我可能只是不知道要查找的正确术语。

应该可以通过使用内置运算符(函数式风格)来实现,但是通过使用命令式逻辑实现自定义运算符来实现更加直接。

public static IObservable<Unit> Alarm<T>(this IObservable<T> source,
    T threshold, TimeSpan delay, IComparer<T> comparer = null)
{
    comparer = comparer ?? Comparer<T>.Default;
    return Observable.Create<Unit>(o =>
    {
        Stopwatch stopwatch = new Stopwatch();
        int alarmState = 0; // 0: OK, 1: above threshold, 2: signal transmitted

        return source.Subscribe(x =>
        {
            if (comparer.Compare(x, threshold) >= 0)
            {
                if (alarmState == 0)
                {
                    alarmState = 1;
                    stopwatch.Restart();
                }
                else if (alarmState == 1 && stopwatch.Elapsed >= delay)
                {
                    alarmState = 2;
                    o.OnNext(Unit.Default);
                }
            }
            else
            {
                alarmState = 0;
            }
        }, o.OnError, o.OnCompleted);
    });
}

我想了一下,意识到我们可能想多了。

让我们从一个可观察的阈值开始:

var threshold = 
source
.Select(temp => temp > 100)
.DistinctUntilChanged();

更改 高于或低于 100 时,这会产生一个值。如果它继续高于 100 或低于 100,则不会产生任何新值。

现在让我们定义:

var alarmUp =
    threshold
    .Throttle(TimeSpan.FromSeconds(5))
    .Where(cond => cond == true);

在这里,如果条件在 5 秒内没有改变,throttle 运算符会吐出一个值。现在它可能是 true (> 100) 持续 5 秒,或 false (< 100)。 我们只对 true (> 100) 感兴趣。

在两者之间,如果有任何变化,油门操作器会重置,因此该条件必须至少保持 5 秒。

如果出现新值导致您想忽略任何先前出现的值的结果,则应使用 .Switch()

这是您需要的查询:

IObservable<Unit> query =
    source
        .Select(x => x > 100.0)
        .DistinctUntilChanged()
        .Select(x => x
            ? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default)
            : Observable.Never<Unit>())
        .Switch();

.Select(x => x >= 100.0).DistinctUntilChanged() 组合将源更改为 IObservable<bool>,仅当传感器 x > 100.0 和 [=16= 之间翻转 时才会触发] - true 超过 100°C,100°C 或以下则为假。

现在我们把 IObservable<bool> 变成 IObservable<IObservable<Unit>>。这是一个产生其他可观察量的可观察量。当我们收到 true 时,我想 return 在 5.0 秒后触发的 IObservable<Unit>,当我们收到 false 时,我想 return 一个根本没有 return 值的可观察值。

这就是 .Select(x => x ? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default) : Observable.Never<Unit>()) 所做的。

最后,我们弹出 .Switch(),它仅根据最后生成的内部 IObservable<Unit> 生成值,将 IObservable<IObservable<Unit>> 更改为 IObservable<Unit>。换句话说,如果传感器在 5.0 秒内再次从 100.0 上方翻转到下方,则它会忽略 Observable.Timer(TimeSpan.FromSeconds(5.0)) 的值并等待 Observable.Never<Unit>() 的值.如果它超过 100.0 超过 5.0,那么 Observable.Timer(TimeSpan.FromSeconds(5.0)) 会触发,您会从查询中得到一个 Unit

这完全符合您的要求。

这是一个稍微简单的查询版本:

IObservable<Unit> query =
    source
        .Select(x => x > 100.0)
        .DistinctUntilChanged()
        .Select(x => x
            ? Observable.Timer(TimeSpan.FromSeconds(5.0))
            : Observable.Never<long>())
        .Switch()
        .Select(x => Unit.Default);