Reactive Extensions (Rx) - 当间隔中没有值时,使用最后一个已知值进行采样

Reactive Extensions (Rx) - sample with last known value when no value is present in interval

我有一个可观察流,它以不一致的间隔生成值,如下所示:

------1---2------3----------------4--------------5---

我想对此进行采样,但在生成 a 值后没有任何空样本:

------1---2------3----------------4--------------5-----

----_----1----2----3----3----3----4----4----4----5----5

我显然认为可以在此处使用 Replay().RefCount() 来为 Sample() 提供最后一个已知值,但由于它没有重新订阅源流,所以没有成功。

关于如何做到这一点有什么想法吗?

假设您的源流是 IObservable<int> xs 那么您的采样间隔是 Timespan duration 那么:

xs.Publish(ps => 
    Observable.Interval(duration)
        .Zip(ps.MostRecent(0), (x,y) => y)
        .SkipUntil(ps))

对于通用解决方案,将 MostRecent0 参数替换为 default(T),其中 IObservable<T> 是源流类型。

Publish 的目的是防止订阅副作用,因为我们需要订阅源两次 - 一次订阅 MostRecent,一次订阅 SkipUntil。后者的目的是防止采样值直到源流的第一个事件。

如果您不关心在源流的第一个事件之前获取默认值,您可以简化此操作:

Observable.Interval(duration)
    .Zip(xs.MostRecent(0), (x,y) => y)

相关运算符 WithLatestFrom 可能也很有趣;这将在下一个版本中出现在 Rx 中。有关详细信息,请参阅 here