IObservable - 在一段时间内忽略新元素

IObservable - Ignore new elements for a span of time

我正在尝试 "throttle" IObservable(我认为是)标准节流方法的不同方式。
我想忽略流中第一个非忽略值之后的 1s 值。

例如,如果 1s=5 个破折号

source: --1-23--45-----678901234
result: --1-----4------6----1---

关于如何实现这一点有什么想法吗?

这应该可以解决问题。可能会有更短的实现。

Scan中的accumulate存储最后保存的TimestampItem并标记是否Keep每一项

public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan duration)
{
    return observable
        .Timestamp()
        .Scan(
            new
            {
                Item = default(T),
                Timestamp = DateTimeOffset.MinValue,
                Keep = false
            },
            (a, x) =>
            {
                var keep = a.Timestamp + duration <= x.Timestamp;
                return new
                {
                    Item = x.Value,
                    Timestamp = keep ? x.Timestamp : a.Timestamp,
                    Keep = keep
                };
            }
        })
        .Where(a => a.Keep)
        .Select(a => a.Item);
}

这是在 Rx 中执行此操作的惯用方法,作为扩展方法 - 下面是使用您的场景的解释和示例。

所需函数的工作方式与 Observable.Throttle 非常相似,但符合条件的事件一到达就发出,而不是延迟到节流或采样周期的持续时间。在排位赛后的给定持续时间内,后续事件将被抑制:

public static IObservable<T> SampleFirst<T>(
    this IObservable<T> source,
    TimeSpan sampleDuration,
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return source.Publish(ps => 
        ps.Window(() => ps.Delay(sampleDuration,scheduler))
          .SelectMany(x => x.Take(1)));
}

想法是使用 Window 的重载创建非重叠 windows 使用 windowClosingSelector 使用由 sampleDuration 时移回来的源。因此,每个 window 将:(a) 由其中的第一个元素关闭,并且 (b) 保持打开状态,直到允许使用新元素。然后我们简单地 select 来自每个 window.

的第一个元素

在下面的示例中,我完全重复了您的测试场景,将一个 "dash" 建模为 100 个刻度。请注意,延迟被指定为 499 个滴答而不是 500 个滴答,这是因为在多个调度程序之间传递事件的分辨率导致 1 个滴答漂移 - 实际上,您不需要详述这一点,因为单个滴答分辨率不太可能有意义。 ReactiveTest class 和 OnNext 辅助方法通过包含 Rx 测试框架 nuget 包 rx-testing:

提供
public class Tests : ReactiveTest
{
    public void Scenario()
    {
        var scheduler = new TestScheduler();
        var test = scheduler.CreateHotObservable<int>(    
            // set up events as per the OP scenario
            // using 1 dash = 100 ticks        
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(500, 3),
            OnNext(800, 4),
            OnNext(900, 5),
            OnNext(1500, 6),
            OnNext(1600, 7),
            OnNext(1700, 8),
            OnNext(1800, 9),
            OnNext(1900, 0),
            OnNext(2000, 1),
            OnNext(2100, 2),
            OnNext(2200, 3),
            OnNext(2300, 4)
            );

        test.SampleFirst(TimeSpan.FromTicks(499), scheduler)
            .Timestamp(scheduler)
            .Subscribe(x =>  Console.WriteLine(
                "Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value));

        scheduler.Start();

    }
}

请注意,输出是根据您的情况得出的:

Time: 200 Value: 1
Time: 800 Value: 4
Time: 1500 Value: 6
Time: 2000 Value: 1