测试响应式扩展 - 如何将测试调度程序与 ToTask() 一起使用?

Testing Reactive Extensions - How do I use the test scheduler with ToTask()?

我在测试使用基于任务的服务的反应式代码时遇到问题。在我的 class 测试中,我使用了任务并使用 ToObservable 对其进行响应式操作。

public void Method()
{
  _svc.MyTaskServiceMethod().ToObservable().Select(....) //pipe it elsewhere and do interesting things.
}

现在在单元测试中我正在测试一些时间(使用 Moq 服务)

svcMock.Setup(x => x.MyTaskServiceMethod()).Returns(() =>
  Observable.Return("VALUE", testScheduler)
    .Delay(TimeSpan.FromMilliseconds(100), testScheduler)
    .ToTask()
  );

问题在于,尽管在 Return/Delay 调用中使用了测试调度程序,但任务本身仍在单独的线程上完成。我通过将当前托管线程 id 的几个控制台写入代码添加到代码中来看到这一点。

svcMock.Setup(x => x.MyServiceMethod()).Returns(() =>
{
  var task = Observable.Return("VALUE", testScheduler)
   .Delay(TimeSpan.FromMilliseconds(1000), testScheduler)
   .Do(x => { Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Obs"); })
   .ToTask();

   task.ContinueWith((_) =>
   {
       Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Task");
   });
   return task;
});

Do(..) 在主测试线程上执行,并且恰好在我预期的 testSchduler.AdvanceBy(..) 调用后发生。

任务继续仍在单独的线程中发生,并且基本上在单元测试主体完成后才执行。所以在我的目标体内,没有任何东西真正被推过我的 task.ToObservable() observable.

默认情况下,任务延续将使用任务池线程,因此您的延续会逃脱测试调度程序的控制。如果您指定选项 TaskContinuationOptions.ExecuteSynchronously,它将使用相同的线程,并且结果将根据需要发布到可观察对象:

task.ContinueWith((_) =>
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Task");
}, TaskContinuationOptions.ExecuteSynchronously);

附录

您可能会发现 this related discussion on the Rx site 对 TPL -> Rx 转换中的并发主题非常有启发性,尤其是 ToObservable()

前段时间,我与人合着了一个基于 NUnit 的单元测试库,以帮助精确地进行 Rx 和 TPL 测试。为此,我们构建了一个测试 TPL 调度程序,以在没有并发的情况下强制所有 TPL 任务 运行。您可以在此处查看相关代码:https://github.com/Testeroids/Testeroids/blob/master/solution/src/app/Testeroids/TplTestPlatformHelper.cs#L87