使用 TestScheduler 测试 IConnectableObservable

Testing an IConnectableObservable with the TestScheduler

好吧,已经晚了,但我终究无法弄清楚为什么会发生以下情况。

我正在尝试测试以下内容(已简化)IConnectableObservable<long>

private const int PollingIntervalMinutes = 5;

private IConnectableObservable<long> CreateObservable(IScheduler scheduler)
{
    return Observable
        .Interval(TimeSpan.FromMinutes(PollingIntervalMinutes), scheduler)
        .StartWith(0)
        .Publish();
}

如果我测试它 "long hand" 测试通过:

[Test]
public void ShouldReturnExpectedNumberOfMessagesLongHand()
{
    var scheduler = new TestScheduler();
    var observed = scheduler.CreateObserver<long>();

    var observable = CreateObservable(scheduler);

    observable.Subscribe(observed);

    observable.Connect();

    Assert.That(observed.Messages.Count, Is.EqualTo(1));

    scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);

    Assert.That(observed.Messages.Count, Is.EqualTo(2));

    scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);

    Assert.That(observed.Messages.Count, Is.EqualTo(3));

    scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);

    Assert.That(observed.Messages.Count, Is.EqualTo(4));
}

但是,如果我使用 TestScheduler.Start 方法 - 如下 - 测试挂起并且永远不会到达 Assert:

[Test]
public void ShouldReturnExpectedNumberOfMessages()
{
    var scheduler = new TestScheduler();

    var observable = CreateObservable(scheduler);

    var observed = scheduler.Start(() => { observable.Connect(); return observable; }, TimeSpan.FromMinutes(PollingIntervalMinutes * 3).Ticks);

    Assert.That(observed.Messages.Count, Is.EqualTo(4));
}

通过在可观察对象中放置一个断点(即在额外的 SelectDo 上)我可以看到对 scheduler.Start 的调用导致底层可观察对象旋转(即击中断点数千次)而不是遵守预定时间。

我尝试了各种不同的方法来调用 IConnectableObservable 上的 Connect(即在调用开始之前连接,在 TestScheduler 中安排对 Connect 的调用等)但无济于事。

它肯定与测试 IConnectableObservable 相关,因为删除 Publish(即使其成为普通的冷可观察对象)使测试通过。

完整性检查and/or建议将不胜感激。

未处置的出版商再次罢工。

通常的嫌疑人:

var observable = CreateObservable(scheduler);    
scheduler.Start(() => { observable.Connect(); return observable; }, ...

要实际处理间隔计时器,您需要一种方法来处理来自 observable.Connect() 的订阅,而不是 Start 方法的订阅。

连接后,您的时间间隔会使用测试调度程序(尽可能快地)启动项目,而取消订阅实际上不会做任何事情,将其保留 运行 - 测试调度程序将从未完成。

通常,确保资源处置的一种方法是使用 Using

scheduler.Start(() => Observable.Using(() => observable.Connect(), _ => observable), ...

但是,当取消订阅下游可观察对象时,确保释放原始连接的更简单方法是使用 RefCount

scheduler.Start(() => CreateObservable(scheduler).RefCount(), ...