如何从带有时间戳的数组构建单个预定的 Observable
How to build a single scheduled Observable from an array with timestamps
我有一些代码是从 observable 中缓慢直播的。我想通过另一个可观察对象模拟持久化对象的实时流及其时间戳。
我已经能够在数组上使用 Observable.Return.Delay 来做一些事情,然后合并到一个单独的可观察对象中。我觉得这不是正确的方法,可能每个项目都需要一个线程,所以一旦线程池已满,计时可能会失败。
var data = Enumerable.Range(1, 10);
var observable = data
.Select((x, idx) => Observable.Return(x).Delay(DateTimeOffset.Now.AddSeconds(idx*3)))
.Merge();
安排静态数据的最佳方式是什么?
我认为最简单的方法是使用 Observable.Interval 方法。如果使用它进行测试并需要重用它,您可以创建一个方法来处理延迟结果。
static IObservable<TSource> DelayObservable<TSource>(IEnumerable<TSource> source, TimeSpan delay)
{
var observableSource = source.ToObservable();
// Use start with to fire one off right away
var interval = Observable.Interval(delay).StartWith(0);
// Zip so when the enumerable is done interval is auto unsubscribed
return interval.Zip(observableSource, (d, S) => S);
}
然后就可以这样调用了
var observable = DelayObservable(Enumerable.Range(1, 10), TimeSpan.FromSeconds(3))
作为 Rx 包的一部分,您可以使用一个方便的 class:Timestamped
。不过,无论如何,您不必这样做。这是一个例子:
var timestampedObjects = Enumerable.Range(0, 5)
.Select(i => Timestamped.Create(i, DateTimeOffset.Now + TimeSpan.FromSeconds(i*i)))
.ToList();
var observable = timestampedObjects.ToObservable()
.SelectMany(t => Observable.Timer(t.Timestamp).Take(1).Select(_ => t.Value));
observable.Subscribe(i => Console.WriteLine($"{DateTimeOffset.Now}: {i}"));
除了将 Delay
替换为 Timer
之外,这与您的解决方案几乎相同。 Delay
旨在保留元素之间的 time-gaps 以实现完整的可观察性,但 time-shift 一切。 Timer
用于绝对安排单个项目。
Select
后接 Merge
相当于 SelectMany
.
您的解决方案应该可行,我不会担心线程问题,Rx 在 scheduling/combining 线程资源方面相当高效。
另一种选择是包 Microsoft.Reactive.Testing
,它允许您在虚拟时间、几小时甚至几年之间安排事情,但 运行 它们是即时的。它非常适合单元测试,但对其他任何东西来说都有些麻烦。
有一个 Generate
的重载,它采用精确的 DateTimeOffset
值来决定何时发出一个项目。 Take a look。
签名看起来很复杂,但很简单。您应该确保您使用的时间戳按升序排列,因为 timestamp-function 仅在前面的项目已发出时才进行评估。
你从哪里得到时间戳?你提到了另一个可观察的。如果您可以改进您的示例,我可以尝试生成一些工作代码。
您可以像这样使用 Play
运算符并将您的数据提供给它 - 可能包含 time-shifted(未来)项目。我选择使用 Notification<T>
,因此它也将支持完成和错误测试。
public static IObservable<T> Play<T>(this IEnumerable<Timestamped<Notification<T>>> source, IScheduler scheduler = null)
{
if (scheduler == null) scheduler = Scheduler.Default;
return Observable.Create<T>(observer => new CompositeDisposable(source
.Where(tn => tn.Timestamp > scheduler.Now)
.Select(tn => scheduler.Schedule(tn.Timestamp, () => tn.Value.Accept(observer)))));
}
测试:
var events = Enumerable.Range(0, 10)
.Select(i => Timestamped.Create(i, DateTimeOffset.Now.AddSeconds(3 * i)))
.ToList();
var sch = new TestScheduler();
var testee = events
.Select(e => Timestamped.Create(Notification.CreateOnNext(e.Value), e.Timestamp))
.Play(sch);
var result = new List<Timestamped<int>>();
using (testee.Timestamp(sch).Subscribe(i => result.Add(i)))
sch.Start();
Assert.True(result.SequenceEqual(events));
这个问题有很多不同的答案,我不完全确定哪个是最好的解决方案。
我找到了几个看起来相当干净的解决方案...
public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
return items.Select(x => Observable.Return(x).DelaySubscription(delayResolver(x))).Concat();
}
public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
return items.Select(x => Observable.Empty<TSource>().Delay(delayResolver(x)).Concat(Observable.Return(x))).Concat();
}
Daniel C. Weber to. Although I didn't quite grasp it until I found this 更详细的回答可能是最好的解决方案,感觉它使用了为此设计的实用方法,因此不太可能受到副作用的影响。
public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
return Observable.Generate(
items.GetEnumerator(),
x => x.MoveNext(),
x => x,
x => x.Current,
x => delayResolver(x.Current));
}
我有一些代码是从 observable 中缓慢直播的。我想通过另一个可观察对象模拟持久化对象的实时流及其时间戳。
我已经能够在数组上使用 Observable.Return.Delay 来做一些事情,然后合并到一个单独的可观察对象中。我觉得这不是正确的方法,可能每个项目都需要一个线程,所以一旦线程池已满,计时可能会失败。
var data = Enumerable.Range(1, 10);
var observable = data
.Select((x, idx) => Observable.Return(x).Delay(DateTimeOffset.Now.AddSeconds(idx*3)))
.Merge();
安排静态数据的最佳方式是什么?
我认为最简单的方法是使用 Observable.Interval 方法。如果使用它进行测试并需要重用它,您可以创建一个方法来处理延迟结果。
static IObservable<TSource> DelayObservable<TSource>(IEnumerable<TSource> source, TimeSpan delay)
{
var observableSource = source.ToObservable();
// Use start with to fire one off right away
var interval = Observable.Interval(delay).StartWith(0);
// Zip so when the enumerable is done interval is auto unsubscribed
return interval.Zip(observableSource, (d, S) => S);
}
然后就可以这样调用了
var observable = DelayObservable(Enumerable.Range(1, 10), TimeSpan.FromSeconds(3))
作为 Rx 包的一部分,您可以使用一个方便的 class:Timestamped
。不过,无论如何,您不必这样做。这是一个例子:
var timestampedObjects = Enumerable.Range(0, 5)
.Select(i => Timestamped.Create(i, DateTimeOffset.Now + TimeSpan.FromSeconds(i*i)))
.ToList();
var observable = timestampedObjects.ToObservable()
.SelectMany(t => Observable.Timer(t.Timestamp).Take(1).Select(_ => t.Value));
observable.Subscribe(i => Console.WriteLine($"{DateTimeOffset.Now}: {i}"));
除了将 Delay
替换为 Timer
之外,这与您的解决方案几乎相同。 Delay
旨在保留元素之间的 time-gaps 以实现完整的可观察性,但 time-shift 一切。 Timer
用于绝对安排单个项目。
Select
后接 Merge
相当于 SelectMany
.
您的解决方案应该可行,我不会担心线程问题,Rx 在 scheduling/combining 线程资源方面相当高效。
另一种选择是包 Microsoft.Reactive.Testing
,它允许您在虚拟时间、几小时甚至几年之间安排事情,但 运行 它们是即时的。它非常适合单元测试,但对其他任何东西来说都有些麻烦。
有一个 Generate
的重载,它采用精确的 DateTimeOffset
值来决定何时发出一个项目。 Take a look。
签名看起来很复杂,但很简单。您应该确保您使用的时间戳按升序排列,因为 timestamp-function 仅在前面的项目已发出时才进行评估。
你从哪里得到时间戳?你提到了另一个可观察的。如果您可以改进您的示例,我可以尝试生成一些工作代码。
您可以像这样使用 Play
运算符并将您的数据提供给它 - 可能包含 time-shifted(未来)项目。我选择使用 Notification<T>
,因此它也将支持完成和错误测试。
public static IObservable<T> Play<T>(this IEnumerable<Timestamped<Notification<T>>> source, IScheduler scheduler = null)
{
if (scheduler == null) scheduler = Scheduler.Default;
return Observable.Create<T>(observer => new CompositeDisposable(source
.Where(tn => tn.Timestamp > scheduler.Now)
.Select(tn => scheduler.Schedule(tn.Timestamp, () => tn.Value.Accept(observer)))));
}
测试:
var events = Enumerable.Range(0, 10)
.Select(i => Timestamped.Create(i, DateTimeOffset.Now.AddSeconds(3 * i)))
.ToList();
var sch = new TestScheduler();
var testee = events
.Select(e => Timestamped.Create(Notification.CreateOnNext(e.Value), e.Timestamp))
.Play(sch);
var result = new List<Timestamped<int>>();
using (testee.Timestamp(sch).Subscribe(i => result.Add(i)))
sch.Start();
Assert.True(result.SequenceEqual(events));
这个问题有很多不同的答案,我不完全确定哪个是最好的解决方案。
我找到了几个看起来相当干净的解决方案...
public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
return items.Select(x => Observable.Return(x).DelaySubscription(delayResolver(x))).Concat();
}
public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
return items.Select(x => Observable.Empty<TSource>().Delay(delayResolver(x)).Concat(Observable.Return(x))).Concat();
}
Daniel C. Weber
public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver)
{
return Observable.Generate(
items.GetEnumerator(),
x => x.MoveNext(),
x => x,
x => x.Current,
x => delayResolver(x.Current));
}