使用 TestScheduler 测试 IConnectableObservable
Testing an IConnectableObservable with the TestScheduler
好吧,已经晚了,但我终究无法弄清楚为什么会发生以下情况。
我正在尝试测试以下内容(已简化)IConnectableObservable<long>
:
private const int PollingIntervalMinutes = 5;
private IConnectableObservable<long> CreateObservable(IScheduler scheduler)
{
return Observable
.Interval(TimeSpan.FromMinutes(PollingIntervalMinutes), scheduler)
.StartWith(0)
.Publish();
}
如果我测试它 "long hand" 测试通过:
[Test]
public void ShouldReturnExpectedNumberOfMessagesLongHand()
{
var scheduler = new TestScheduler();
var observed = scheduler.CreateObserver<long>();
var observable = CreateObservable(scheduler);
observable.Subscribe(observed);
observable.Connect();
Assert.That(observed.Messages.Count, Is.EqualTo(1));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(2));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(3));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(4));
}
但是,如果我使用 TestScheduler.Start
方法 - 如下 - 测试挂起并且永远不会到达 Assert
:
[Test]
public void ShouldReturnExpectedNumberOfMessages()
{
var scheduler = new TestScheduler();
var observable = CreateObservable(scheduler);
var observed = scheduler.Start(() => { observable.Connect(); return observable; }, TimeSpan.FromMinutes(PollingIntervalMinutes * 3).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(4));
}
通过在可观察对象中放置一个断点(即在额外的 Select
或 Do
上)我可以看到对 scheduler.Start
的调用导致底层可观察对象旋转(即击中断点数千次)而不是遵守预定时间。
我尝试了各种不同的方法来调用 IConnectableObservable
上的 Connect
(即在调用开始之前连接,在 TestScheduler 中安排对 Connect 的调用等)但无济于事。
它肯定与测试 IConnectableObservable 相关,因为删除 Publish
(即使其成为普通的冷可观察对象)使测试通过。
完整性检查and/or建议将不胜感激。
未处置的出版商再次罢工。
通常的嫌疑人:
var observable = CreateObservable(scheduler);
scheduler.Start(() => { observable.Connect(); return observable; }, ...
要实际处理间隔计时器,您需要一种方法来处理来自 observable.Connect()
的订阅,而不是 Start
方法的订阅。
连接后,您的时间间隔会使用测试调度程序(尽可能快地)启动项目,而取消订阅实际上不会做任何事情,将其保留 运行 - 测试调度程序将从未完成。
通常,确保资源处置的一种方法是使用 Using
。
scheduler.Start(() => Observable.Using(() => observable.Connect(), _ => observable), ...
但是,当取消订阅下游可观察对象时,确保释放原始连接的更简单方法是使用 RefCount
。
scheduler.Start(() => CreateObservable(scheduler).RefCount(), ...
好吧,已经晚了,但我终究无法弄清楚为什么会发生以下情况。
我正在尝试测试以下内容(已简化)IConnectableObservable<long>
:
private const int PollingIntervalMinutes = 5;
private IConnectableObservable<long> CreateObservable(IScheduler scheduler)
{
return Observable
.Interval(TimeSpan.FromMinutes(PollingIntervalMinutes), scheduler)
.StartWith(0)
.Publish();
}
如果我测试它 "long hand" 测试通过:
[Test]
public void ShouldReturnExpectedNumberOfMessagesLongHand()
{
var scheduler = new TestScheduler();
var observed = scheduler.CreateObserver<long>();
var observable = CreateObservable(scheduler);
observable.Subscribe(observed);
observable.Connect();
Assert.That(observed.Messages.Count, Is.EqualTo(1));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(2));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(3));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(4));
}
但是,如果我使用 TestScheduler.Start
方法 - 如下 - 测试挂起并且永远不会到达 Assert
:
[Test]
public void ShouldReturnExpectedNumberOfMessages()
{
var scheduler = new TestScheduler();
var observable = CreateObservable(scheduler);
var observed = scheduler.Start(() => { observable.Connect(); return observable; }, TimeSpan.FromMinutes(PollingIntervalMinutes * 3).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(4));
}
通过在可观察对象中放置一个断点(即在额外的 Select
或 Do
上)我可以看到对 scheduler.Start
的调用导致底层可观察对象旋转(即击中断点数千次)而不是遵守预定时间。
我尝试了各种不同的方法来调用 IConnectableObservable
上的 Connect
(即在调用开始之前连接,在 TestScheduler 中安排对 Connect 的调用等)但无济于事。
它肯定与测试 IConnectableObservable 相关,因为删除 Publish
(即使其成为普通的冷可观察对象)使测试通过。
完整性检查and/or建议将不胜感激。
未处置的出版商再次罢工。
通常的嫌疑人:
var observable = CreateObservable(scheduler);
scheduler.Start(() => { observable.Connect(); return observable; }, ...
要实际处理间隔计时器,您需要一种方法来处理来自 observable.Connect()
的订阅,而不是 Start
方法的订阅。
连接后,您的时间间隔会使用测试调度程序(尽可能快地)启动项目,而取消订阅实际上不会做任何事情,将其保留 运行 - 测试调度程序将从未完成。
通常,确保资源处置的一种方法是使用 Using
。
scheduler.Start(() => Observable.Using(() => observable.Connect(), _ => observable), ...
但是,当取消订阅下游可观察对象时,确保释放原始连接的更简单方法是使用 RefCount
。
scheduler.Start(() => CreateObservable(scheduler).RefCount(), ...