如何从带有时间戳的数组构建单个预定的 Observable

How to build a single scheduled Observable from an array with timestamps

我有一些代码是从 observable 中缓慢直播的。我想通过另一个可观察对象模拟持久化对象的实时流及其时间戳。

我已经能够在数组上使用 Observable.Return.Delay 来做一些事情,然后合并到一个单独的可观察对象中。我觉得这不是正确的方法,可能每个项目都需要一个线程,所以一旦线程池已满,计时可能会失败。

var data = Enumerable.Range(1, 10);

var observable = data
    .Select((x, idx) => Observable.Return(x).Delay(DateTimeOffset.Now.AddSeconds(idx*3)))
    .Merge();

安排静态数据的最佳方式是什么?

我认为最简单的方法是使用 Observable.Interval 方法。如果使用它进行测试并需要重用它,您可以创建一个方法来处理延迟结果。

static IObservable<TSource> DelayObservable<TSource>(IEnumerable<TSource> source, TimeSpan delay)
{
    var observableSource = source.ToObservable();

    // Use start with to fire one off right away
    var interval = Observable.Interval(delay).StartWith(0);

    // Zip so when the enumerable is done interval is auto unsubscribed
    return interval.Zip(observableSource, (d, S) => S);
}

然后就可以这样调用了

var observable = DelayObservable(Enumerable.Range(1, 10), TimeSpan.FromSeconds(3))

作为 Rx 包的一部分,您可以使用一个方便的 class:Timestamped。不过,无论如何,您不必这样做。这是一个例子:

var timestampedObjects = Enumerable.Range(0, 5)
    .Select(i => Timestamped.Create(i, DateTimeOffset.Now + TimeSpan.FromSeconds(i*i)))
    .ToList();

var observable = timestampedObjects.ToObservable()
    .SelectMany(t => Observable.Timer(t.Timestamp).Take(1).Select(_ => t.Value));

observable.Subscribe(i => Console.WriteLine($"{DateTimeOffset.Now}: {i}"));

除了将 Delay 替换为 Timer 之外,这与您的解决方案几乎相同。 Delay 旨在保留元素之间的 time-gaps 以实现完整的可观察性,但 time-shift 一切。 Timer 用于绝对安排单个项目。

Select 后接 Merge 相当于 SelectMany.

您的解决方案应该可行,我不会担心线程问题,Rx 在 scheduling/combining 线程资源方面相当高效。

另一种选择是包 Microsoft.Reactive.Testing,它允许您在虚拟时间、几小时甚至几年之间安排事情,但 运行 它们是即时的。它非常适合单元测试,但对其他任何东西来说都有些麻烦。

有一个 Generate 的重载,它采用精确的 DateTimeOffset 值来决定何时发出一个项目。 Take a look

签名看起来很复杂,但很简单。您应该确保您使用的时间戳按升序排列,因为 timestamp-function 仅在前面的项目已发出时才进行评估。

你从哪里得到时间戳?你提到了另一个可观察的。如果您可以改进您的示例,我可以尝试生成一些工作代码。

您可以像这样使用 Play 运算符并将您的数据提供给它 - 可能包含 time-shifted(未来)项目。我选择使用 Notification<T>,因此它也将支持完成和错误测试。

public static IObservable<T> Play<T>(this IEnumerable<Timestamped<Notification<T>>> source, IScheduler scheduler = null)
{
    if (scheduler == null) scheduler = Scheduler.Default;

    return Observable.Create<T>(observer => new CompositeDisposable(source
        .Where(tn => tn.Timestamp > scheduler.Now)
        .Select(tn => scheduler.Schedule(tn.Timestamp, () => tn.Value.Accept(observer)))));
}

测试:

var events = Enumerable.Range(0, 10)
    .Select(i => Timestamped.Create(i, DateTimeOffset.Now.AddSeconds(3 * i)))
    .ToList();
var sch = new TestScheduler();
var testee = events
    .Select(e => Timestamped.Create(Notification.CreateOnNext(e.Value), e.Timestamp))
    .Play(sch);
var result = new List<Timestamped<int>>();
using (testee.Timestamp(sch).Subscribe(i => result.Add(i)))
    sch.Start();
Assert.True(result.SequenceEqual(events));

这个问题有很多不同的答案,我不完全确定哪个是最好的解决方案。

我找到了几个看起来相当干净的解决方案...

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
    return items.Select(x => Observable.Return(x).DelaySubscription(delayResolver(x))).Concat();
}

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
    return items.Select(x => Observable.Empty<TSource>().Delay(delayResolver(x)).Concat(Observable.Return(x))).Concat();
}

Daniel C. Weber to. Although I didn't quite grasp it until I found this 更详细的回答可能是最好的解决方案,感觉它使用了为此设计的实用方法,因此不太可能受到副作用的影响。

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
    return Observable.Generate(
        items.GetEnumerator(),
        x => x.MoveNext(),
        x => x,
        x => x.Current,
        x => delayResolver(x.Current));
}