Reactive Extensions (Rx) - 当间隔中没有值时,使用最后一个已知值进行采样
Reactive Extensions (Rx) - sample with last known value when no value is present in interval
我有一个可观察流,它以不一致的间隔生成值,如下所示:
------1---2------3----------------4--------------5---
我想对此进行采样,但在生成 a 值后没有任何空样本:
------1---2------3----------------4--------------5-----
----_----1----2----3----3----3----4----4----4----5----5
我显然认为可以在此处使用 Replay().RefCount()
来为 Sample()
提供最后一个已知值,但由于它没有重新订阅源流,所以没有成功。
关于如何做到这一点有什么想法吗?
假设您的源流是 IObservable<int> xs
那么您的采样间隔是 Timespan duration
那么:
xs.Publish(ps =>
Observable.Interval(duration)
.Zip(ps.MostRecent(0), (x,y) => y)
.SkipUntil(ps))
对于通用解决方案,将 MostRecent
的 0
参数替换为 default(T)
,其中 IObservable<T>
是源流类型。
Publish
的目的是防止订阅副作用,因为我们需要订阅源两次 - 一次订阅 MostRecent
,一次订阅 SkipUntil
。后者的目的是防止采样值直到源流的第一个事件。
如果您不关心在源流的第一个事件之前获取默认值,您可以简化此操作:
Observable.Interval(duration)
.Zip(xs.MostRecent(0), (x,y) => y)
相关运算符 WithLatestFrom
可能也很有趣;这将在下一个版本中出现在 Rx 中。有关详细信息,请参阅 here。
我有一个可观察流,它以不一致的间隔生成值,如下所示:
------1---2------3----------------4--------------5---
我想对此进行采样,但在生成 a 值后没有任何空样本:
------1---2------3----------------4--------------5-----
----_----1----2----3----3----3----4----4----4----5----5
我显然认为可以在此处使用 Replay().RefCount()
来为 Sample()
提供最后一个已知值,但由于它没有重新订阅源流,所以没有成功。
关于如何做到这一点有什么想法吗?
假设您的源流是 IObservable<int> xs
那么您的采样间隔是 Timespan duration
那么:
xs.Publish(ps =>
Observable.Interval(duration)
.Zip(ps.MostRecent(0), (x,y) => y)
.SkipUntil(ps))
对于通用解决方案,将 MostRecent
的 0
参数替换为 default(T)
,其中 IObservable<T>
是源流类型。
Publish
的目的是防止订阅副作用,因为我们需要订阅源两次 - 一次订阅 MostRecent
,一次订阅 SkipUntil
。后者的目的是防止采样值直到源流的第一个事件。
如果您不关心在源流的第一个事件之前获取默认值,您可以简化此操作:
Observable.Interval(duration)
.Zip(xs.MostRecent(0), (x,y) => y)
相关运算符 WithLatestFrom
可能也很有趣;这将在下一个版本中出现在 Rx 中。有关详细信息,请参阅 here。