如何让事件至少间隔一个给定的时间跨度?

How to have events separated at least by a given time span?

我希望输出序列的事件尽快发生,但不在从最近事件开始的 window N 秒内发生。

这是一个大理石图,假设我想在事件之间至少分隔三个破折号:

Input:  a-------b-cd-----e---------f-----g-h
Result: a-------b---c---d---e------f-----g---h

签名为:

IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation);

简要说明:

  • Timestamp():为每个事件添加时间戳。
  • Scan():此函数聚合类似于 Aggregate(),但生成部分聚合值的序列,而不仅仅是最终项目。它用于确定每个事件的所需时间戳,同时考虑到最后所需的时间戳,以及与原始时间戳的延迟。
  • Delay():这本身执行延迟(感谢 https://twitter.com/AzazelN28,不知道这个过载!)
  • Select(): 重新获取原始值。
    public static IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation)
    {
        return source
            .Timestamp()
            .Scan(
                new {
                    value = default(T),
                    time = DateTimeOffset.MinValue,
                    delay = TimeSpan.Zero },
                (acc, item) =>
                {
                    var time = 
                        item.Timestamp - acc.time >= separation
                        ? item.Timestamp
                        : acc.time.Add(separation);
                    return new
                    {
                        value = item.Value,
                        time,
                        delay = time - item.Timestamp
                    };
                })
            .Delay(x => Observable.Timer(x.delay))
            .Select(x => x.value);
    }

感谢您提出一个非常有趣的问题。我试了一下 - 开始安排未来的行动 - 虽然我设法达到了预期的输出,但我的解决方案存在重大问题。

你的更干净,但是……嗯……错了。嗯,略;0)

我首先使用 Microsoft 的 TestScheduler 编写了以下测试装置:

[Fact]
public void MatchExpected()
{
    TestScheduler scheduler = new TestScheduler();

    // 0        1         2         3         4 
    // 1234567890123456789012345678901234567890
    // a-------b-cd-----e---------f-----ghX     <- Input
    IObservable<char> input = scheduler.CreateColdObservable(
        ReactiveTest.OnNext(1, 'a'),
        ReactiveTest.OnNext(9, 'b'),
        ReactiveTest.OnNext(11, 'c'),
        ReactiveTest.OnNext(12, 'd'),
        ReactiveTest.OnNext(18, 'e'),
        ReactiveTest.OnNext(28, 'f'),
        ReactiveTest.OnNext(34, 'g'),
        ReactiveTest.OnNext(35, 'h'),
        ReactiveTest.OnCompleted<char>(36)
    );

    // 0        1         2         3         4 
    // 1234567890123456789012345678901234567890
    // a-------b-cd-----e---------f-----ghX     <- Input
    // a-------b---c---d---e------f-----g---hX  <- Expected
    var expected = new []
    {
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 1, 'a'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 9, 'b'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 13, 'c'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 17, 'd'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 21, 'e'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 28, 'f'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 34, 'g'),
        ReactiveTest.OnNext(ReactiveTest.Subscribed + 38, 'h'),
        ReactiveTest.OnCompleted<char>(ReactiveTest.Subscribed + 38)
    };            

    var actual = scheduler.Start(() => input.Separate(TimeSpan.FromTicks(4), scheduler), ReactiveTest.Subscribed + 40);

    Assert.Equal(expected, actual.Messages.ToArray());
}

在此您可以看到输入和预期输出的弹珠图(使用您原来的破折号表示法)。不幸的是,在使用您的实现时,您会收到以下输出:

// 0        1         2         3         4 
// 1234567890123456789012345678901234567890
// a-------b-cd-----e---------f-----ghX     <- Input
// a-------b---c---d---e------f-----g---hX  <- Expected
// -a-------b--c---d---e-------f-----g--hX  <- Actual

你看,使用 observable 来结束延迟的 Delay 重载在 observable 可以发出值之前需要调度程序上的时间。不幸的是,在应该立即发出值的情况下 (x.delay == TimeSpan.Zero),由于通过调度程序循环,它实际上会晚一点发出。

因为我有测试夹具而你有可行的解决方案,我想我会 post 支持一个更正后的版本,如下所示:

public static IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation, IScheduler scheduler)
{
    return Observable.Create<T>(
        observer =>
        {
            var timedSource = source
                .Timestamp(scheduler)
                .Scan(
                    new
                    {
                        value = default(T),
                        time = DateTimeOffset.MinValue,
                        delay = TimeSpan.Zero
                    },
                    (acc, item) =>
                    {
                        var time =
                            item.Timestamp - acc.time >= separation
                            ? item.Timestamp
                            : acc.time.Add(separation);
                        return new
                        {
                            value = item.Value,
                            time,
                            delay = time - item.Timestamp
                        };
                    })
                .Publish();

            var combinedSource = Observable.Merge(
                timedSource.Where(x => x.delay == TimeSpan.Zero),
                timedSource.Where(x => x.delay > TimeSpan.Zero).Delay(x => Observable.Timer(x.delay, scheduler))
            );

            return new CompositeDisposable(
                combinedSource.Select(x => x.value).Subscribe(observer),
                timedSource.Connect()
            );
        }
    );
}

它提供了预期的输出:

// 0        1         2         3         4 
// 1234567890123456789012345678901234567890
// a-------b-cd-----e---------f-----ghX     <- Input
// a-------b---c---d---e------f-----g---hX  <- Expected
// a-------b---c---d---e------f-----g---hX  <- Actual

请注意 IScheduler 参数的添加,它会在整个操作员代码中使用。在 Rx 中实现任何可能引入并发性的运算符(就像这个一样)时,这是一个很好的做法,它允许您编写(非常严格的)测试!

好了。希望对您有所帮助:0)

严重依赖@ibebbs 的回答,我已经使用测试来查看是否有更简单的方法。

就在我编写代码之前,我看到了一些已编码到测试中的假设。但是,我不知道是否需要这些断言。特别是 OnCompleted 时间。 @ibebbs 断言 OnCompleted 应该出现在与最后一个值相同的帧中。 OP没有提出这样的要求。

如果这不是必需的,那么您可以采用完全不同的方法。

当我看到你的大理石图时,我从输入到结果的心理翻译如下

Input:  a-------b-cd-----e---------f-----g-h
        a---|
                b--|
                    c--|
                        d--|
                            e--|
Result: a-------b---c---d---e------f-----g---h

即每个值都被投影到一个新的具有长尾的单值序列。也就是说,直到给定的缓冲时间,它才会完成。这使得代码就像从单个值到具有单个值和延迟完成的序列的投影一样简单。然后你只需将所有这些迷你序列连接在一起[=13​​=]

public static IObservable<T> Separate<T>(this IObservable<T> source, TimeSpan separation, IScheduler scheduler)
{
    var delayedEmpty = Observable.Empty<T>().Delay(separation, scheduler);
    return source.Select(s=>
            Observable.Return(s).Concat(delayedEmpty)
        ).Concat();
}

这将解决 OP,但是您也会在完成序列时获得与为每个值获得的相同的缓冲区。