如何长时间监控 IObservable<double> 是否超出范围
How to monitor IObservable<double> for Out of Range for Extended Period of Time
我有一个 IObservable<double>
以某种固定的间隔提供从传感器读取的值。我想在传感器值长时间超出范围时发出信号。
举个具体的例子,假设可观察对象是一个温度传感器。我想监控温度何时超过 100°C 并持续 5 秒。也就是说,从 IObservable<double>
产生一个观察值或事件,当温度超过 100°C 5 秒时触发一次。如果任何数量的样本的温度读数高于 100°C 的时间少于 5 秒,则这没有影响。在一组样品中的第一个样品全部超过 100°C 5 秒后,它应该发出信号。如果该值继续高于 100°C,则不应重新发出信号,直到温度降至 100°C 以下,然后再次超过它 5 秒。
似乎这对于响应式扩展来说应该很简单,但作为新手,我找不到任何东西。我浏览了 http://reactivex.io and http://introtorx.com 但没有找到任何东西。我可能只是不知道要查找的正确术语。
应该可以通过使用内置运算符(函数式风格)来实现,但是通过使用命令式逻辑实现自定义运算符来实现更加直接。
public static IObservable<Unit> Alarm<T>(this IObservable<T> source,
T threshold, TimeSpan delay, IComparer<T> comparer = null)
{
comparer = comparer ?? Comparer<T>.Default;
return Observable.Create<Unit>(o =>
{
Stopwatch stopwatch = new Stopwatch();
int alarmState = 0; // 0: OK, 1: above threshold, 2: signal transmitted
return source.Subscribe(x =>
{
if (comparer.Compare(x, threshold) >= 0)
{
if (alarmState == 0)
{
alarmState = 1;
stopwatch.Restart();
}
else if (alarmState == 1 && stopwatch.Elapsed >= delay)
{
alarmState = 2;
o.OnNext(Unit.Default);
}
}
else
{
alarmState = 0;
}
}, o.OnError, o.OnCompleted);
});
}
我想了一下,意识到我们可能想多了。
让我们从一个可观察的阈值开始:
var threshold =
source
.Select(temp => temp > 100)
.DistinctUntilChanged();
当 更改 高于或低于 100 时,这会产生一个值。如果它继续高于 100 或低于 100,则不会产生任何新值。
现在让我们定义:
var alarmUp =
threshold
.Throttle(TimeSpan.FromSeconds(5))
.Where(cond => cond == true);
在这里,如果条件在 5 秒内没有改变,throttle
运算符会吐出一个值。现在它可能是 true
(> 100) 持续 5 秒,或 false
(< 100)。
我们只对 true
(> 100) 感兴趣。
在两者之间,如果有任何变化,油门操作器会重置,因此该条件必须至少保持 5 秒。
如果出现新值导致您想忽略任何先前出现的值的结果,则应使用 .Switch()
。
这是您需要的查询:
IObservable<Unit> query =
source
.Select(x => x > 100.0)
.DistinctUntilChanged()
.Select(x => x
? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default)
: Observable.Never<Unit>())
.Switch();
.Select(x => x >= 100.0).DistinctUntilChanged()
组合将源更改为 IObservable<bool>
,仅当传感器 在 x > 100.0
和 [=16= 之间翻转 时才会触发] - true
超过 100°C,100°C 或以下则为假。
现在我们把 IObservable<bool>
变成 IObservable<IObservable<Unit>>
。这是一个产生其他可观察量的可观察量。当我们收到 true
时,我想 return 在 5.0
秒后触发的 IObservable<Unit>
,当我们收到 false
时,我想 return 一个根本没有 return 值的可观察值。
这就是 .Select(x => x ? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default) : Observable.Never<Unit>())
所做的。
最后,我们弹出 .Switch()
,它仅根据最后生成的内部 IObservable<Unit>
生成值,将 IObservable<IObservable<Unit>>
更改为 IObservable<Unit>
。换句话说,如果传感器在 5.0
秒内再次从 100.0
上方翻转到下方,则它会忽略 Observable.Timer(TimeSpan.FromSeconds(5.0))
的值并等待 Observable.Never<Unit>()
的值.如果它超过 100.0
超过 5.0
,那么 Observable.Timer(TimeSpan.FromSeconds(5.0))
会触发,您会从查询中得到一个 Unit
。
这完全符合您的要求。
这是一个稍微简单的查询版本:
IObservable<Unit> query =
source
.Select(x => x > 100.0)
.DistinctUntilChanged()
.Select(x => x
? Observable.Timer(TimeSpan.FromSeconds(5.0))
: Observable.Never<long>())
.Switch()
.Select(x => Unit.Default);
我有一个 IObservable<double>
以某种固定的间隔提供从传感器读取的值。我想在传感器值长时间超出范围时发出信号。
举个具体的例子,假设可观察对象是一个温度传感器。我想监控温度何时超过 100°C 并持续 5 秒。也就是说,从 IObservable<double>
产生一个观察值或事件,当温度超过 100°C 5 秒时触发一次。如果任何数量的样本的温度读数高于 100°C 的时间少于 5 秒,则这没有影响。在一组样品中的第一个样品全部超过 100°C 5 秒后,它应该发出信号。如果该值继续高于 100°C,则不应重新发出信号,直到温度降至 100°C 以下,然后再次超过它 5 秒。
似乎这对于响应式扩展来说应该很简单,但作为新手,我找不到任何东西。我浏览了 http://reactivex.io and http://introtorx.com 但没有找到任何东西。我可能只是不知道要查找的正确术语。
应该可以通过使用内置运算符(函数式风格)来实现,但是通过使用命令式逻辑实现自定义运算符来实现更加直接。
public static IObservable<Unit> Alarm<T>(this IObservable<T> source,
T threshold, TimeSpan delay, IComparer<T> comparer = null)
{
comparer = comparer ?? Comparer<T>.Default;
return Observable.Create<Unit>(o =>
{
Stopwatch stopwatch = new Stopwatch();
int alarmState = 0; // 0: OK, 1: above threshold, 2: signal transmitted
return source.Subscribe(x =>
{
if (comparer.Compare(x, threshold) >= 0)
{
if (alarmState == 0)
{
alarmState = 1;
stopwatch.Restart();
}
else if (alarmState == 1 && stopwatch.Elapsed >= delay)
{
alarmState = 2;
o.OnNext(Unit.Default);
}
}
else
{
alarmState = 0;
}
}, o.OnError, o.OnCompleted);
});
}
我想了一下,意识到我们可能想多了。
让我们从一个可观察的阈值开始:
var threshold =
source
.Select(temp => temp > 100)
.DistinctUntilChanged();
当 更改 高于或低于 100 时,这会产生一个值。如果它继续高于 100 或低于 100,则不会产生任何新值。
现在让我们定义:
var alarmUp =
threshold
.Throttle(TimeSpan.FromSeconds(5))
.Where(cond => cond == true);
在这里,如果条件在 5 秒内没有改变,throttle
运算符会吐出一个值。现在它可能是 true
(> 100) 持续 5 秒,或 false
(< 100)。
我们只对 true
(> 100) 感兴趣。
在两者之间,如果有任何变化,油门操作器会重置,因此该条件必须至少保持 5 秒。
如果出现新值导致您想忽略任何先前出现的值的结果,则应使用 .Switch()
。
这是您需要的查询:
IObservable<Unit> query =
source
.Select(x => x > 100.0)
.DistinctUntilChanged()
.Select(x => x
? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default)
: Observable.Never<Unit>())
.Switch();
.Select(x => x >= 100.0).DistinctUntilChanged()
组合将源更改为 IObservable<bool>
,仅当传感器 在 x > 100.0
和 [=16= 之间翻转 时才会触发] - true
超过 100°C,100°C 或以下则为假。
现在我们把 IObservable<bool>
变成 IObservable<IObservable<Unit>>
。这是一个产生其他可观察量的可观察量。当我们收到 true
时,我想 return 在 5.0
秒后触发的 IObservable<Unit>
,当我们收到 false
时,我想 return 一个根本没有 return 值的可观察值。
这就是 .Select(x => x ? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default) : Observable.Never<Unit>())
所做的。
最后,我们弹出 .Switch()
,它仅根据最后生成的内部 IObservable<Unit>
生成值,将 IObservable<IObservable<Unit>>
更改为 IObservable<Unit>
。换句话说,如果传感器在 5.0
秒内再次从 100.0
上方翻转到下方,则它会忽略 Observable.Timer(TimeSpan.FromSeconds(5.0))
的值并等待 Observable.Never<Unit>()
的值.如果它超过 100.0
超过 5.0
,那么 Observable.Timer(TimeSpan.FromSeconds(5.0))
会触发,您会从查询中得到一个 Unit
。
这完全符合您的要求。
这是一个稍微简单的查询版本:
IObservable<Unit> query =
source
.Select(x => x > 100.0)
.DistinctUntilChanged()
.Select(x => x
? Observable.Timer(TimeSpan.FromSeconds(5.0))
: Observable.Never<long>())
.Switch()
.Select(x => Unit.Default);