使用 TestScheduler 的 SubscribeOn 无法订阅,直到第 2 个 Tick

SubscribeOn with TestScheduler fails to Subscribe until 2nd Tick

我在使用 TestScheduler 调用 SubscribeOn 时看到以下不一致的行为。

        var testScheduler = new TestScheduler();
        var subject = new Subject<int>();

        testScheduler.Schedule(() => subject.OnNext(1));
        testScheduler.Schedule(TimeSpan.FromTicks(1), () => subject.OnNext(2));
        testScheduler.Schedule(TimeSpan.FromTicks(2), () => subject.OnNext(3));
        testScheduler.Schedule(TimeSpan.FromTicks(3), () => subject.OnNext(4));

        subject
            .SubscribeOn(testScheduler)
            .Subscribe(Console.WriteLine);

        testScheduler.Start();
        Console.ReadKey();

产生输出:

3
4

但是,如果您在安排之前订阅观察者,那么它会按预期工作。

        var testScheduler = new TestScheduler();
        var subject = new Subject<int>();

        subject
            .SubscribeOn(testScheduler)
            .Subscribe(Console.WriteLine);

        testScheduler.Schedule(() => subject.OnNext(1));
        testScheduler.Schedule(TimeSpan.FromTicks(1), () => subject.OnNext(2));
        testScheduler.Schedule(TimeSpan.FromTicks(2), () => subject.OnNext(3));
        testScheduler.Schedule(TimeSpan.FromTicks(3), () => subject.OnNext(4));


        testScheduler.Start();
        Console.ReadKey();

产生输出:

1
2
3
4

谁能解释这种行为,或者这是一个错误?

我相信正在发生的事情是你有效地安排了两件事在同一时间点发生。初始 OnNext(1) 调用和订阅,即 SubscribeOn 都计划在 tick 0 隐式发生。

当安排两件事同时发生时,首先安排的事情将 运行 首先,然后是第二件事,依此类推。它们都会在虚拟时钟上看到相同的时间,但它是单线程的,所以一次只能 运行 一件事。

在你的第一个例子中,你实际上有一个行动日志如下

Time (in Ticks) Action
---------------------------
       0        () => subject.OnNext(1)
       0        () => subject.Subscribe(..)
10000000        () => subject.OnNext(2)
20000000        () => subject.OnNext(3)
30000000        () => subject.OnNext(4)

在你的第二个例子中,日志看起来更像这样

Time (in Ticks) Action
---------------------------
       0        () => subject.Subscribe(..)
       0        () => subject.OnNext(1)
10000000        () => subject.OnNext(2)
20000000        () => subject.OnNext(3)
30000000        () => subject.OnNext(4)

所以虽然订阅和OnNext的时间相同,但顺序不同。将 testScheduler.Start(); 视为循环执行该日志的操作并推进时钟可能会有所帮助。考虑到这一点,应该清楚为什么您在第一个示例中看不到值 1。

我有两个建议:

  • 使用 TestScheduler 创建您的可观察序列而不是主题。
  • 避免在测试中将事情安排在时间 0。这是现实世界中不太可能发生的事情,它会导致看起来很奇怪的测试(你测试的东西是否相差一个)

你可以像这样重写你的测试

var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<int>();
var sequence = testScheduler.CreateHotObservable(
    ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks, 1),
    ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks, 2),
    ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks, 3),
    ReactiveTest.OnNext(TimeSpan.FromSeconds(4).Ticks, 4)
    );

sequence
    .SubscribeOn(testScheduler)
    .Subscribe(observer);
testScheduler.Start();

var expected = new[]
{
    ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks, 1),
    ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks, 2),
    ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks, 3),
    ReactiveTest.OnNext(TimeSpan.FromSeconds(4).Ticks, 4),
};
CollectionAssert.AreEqual(expected, observer.Messages);

如果你想减少代码中的噪音,你可以子class ReactiveTest class (在 Rx 测试中)这样你就可以直接访问 OnNext 工厂方法及其兄弟 OnError + OnCompleted。您还可以围绕 ticks 和 TimeSpans 做一些事情来减少那里的噪音。