单元测试失败 Observable.FromAsync 和 Observable.Switch
Unit tests failing with Observable.FromAsync and Observable.Switch
我在测试使用 Observable.FromAsync<T>()
和 Observable.Switch<T>()
的 class 时遇到问题。它所做的是等待触发器 observable 产生一个值,然后它开始一个异步操作,最后在一个输出序列中重新收集所有操作的结果。它的要点是这样的:
var outputStream = triggerStream
.Select(_ => Observable
.FromAsync(token => taskProducer.DoSomethingAsync(token)))
.Switch();
我用最少的部分进行了一些健全性检查测试以了解发生了什么,这是测试结果在评论中:
class test_with_rx : nspec
{
void Given_async_task_and_switch()
{
Subject<Unit> triggerStream = null;
TaskCompletionSource<long> taskDriver = null;
ITestableObserver<long> testObserver = null;
IDisposable subscription = null;
before = () =>
{
TestScheduler scheduler = new TestScheduler();
testObserver = scheduler.CreateObserver<long>();
triggerStream = new Subject<Unit>();
taskDriver = new TaskCompletionSource<long>();
// build stream under test
IObservable<long> streamUnderTest = triggerStream
.Select(_ => Observable
.FromAsync(token => taskDriver.Task))
.Switch();
/* Also tried with this Switch() overload
IObservable<long> streamUnderTest = triggerStream
.Select(_ => taskDriver.Task)
.Switch(); */
subscription = streamUnderTest.Subscribe(testObserver);
};
context["Before trigger"] = () =>
{
it["Should not notify"] = () => testObserver.Messages.Count.Should().Be(0);
// PASSED
};
context["After trigger"] = () =>
{
before = () => triggerStream.OnNext(Unit.Default);
context["When task completes"] = () =>
{
long result = -1;
before = () =>
{
taskDriver.SetResult(result);
//taskDriver.Task.Wait(); // tried with this too
};
it["Should notify once"] = () => testObserver.Messages.Count.Should().Be(1);
// FAILED: expected 1, actual 0
it["Should notify task result"] = () => testObserver.Messages[0].Value.Value.Should().Be(result);
// FAILED: of course, index out of bound
};
};
after = () =>
{
taskDriver.TrySetCanceled();
taskDriver.Task.Dispose();
subscription.Dispose();
};
}
}
在我也用模拟完成的其他测试中,我可以看到实际上调用了传递给 FromAsync 的 Func(例如 taskProducer.DoSomethingAsync(token)
),但随后看起来没有其他内容,并且输出流不产生价值。
我也尝试在达到预期之前插入一些 Task.Delay(x).Wait()
或一些 taskDriver.Task.Wait()
,但没有成功。
我读过 this SO thread 并且知道调度程序,但乍一看我认为我不需要它们,没有 ObserveOn()
被使用。我错了吗?我错过了什么? TA
为了完整起见,测试框架是 NSpec,断言库是 FluentAssertions。
您遇到的是一起测试 Rx 和 TPL 的情况。
可以找到详尽的解释 ,但我会尝试为您的特定代码提供建议。
基本上你的代码工作正常,但你的测试不是。
Observable.FromAsync
将转换为所提供任务的 ContinueWith
,该任务将在任务池中执行,因此是异步的。
修复 测试的多种方法:(从难看到复杂)
结果集后休眠(注意等待不起作用,因为 Wait 不等待继续)
taskDriver.SetResult(result);
Thread.Sleep(50);
在执行之前设置结果FromAsync
(因为如果任务完成,FromAsync 将return 一个立即的 IObservable,也就是跳过 ContinueWith
)
taskDriver.SetResult(result);
triggerStream.OnNext(Unit.Default);
将 FromAsync 替换为可测试的替代方案,例如
public static IObservable<T> ToObservable<T>(Task<T> task, TaskScheduler scheduler)
{
if (task.IsCompleted)
{
return task.ToObservable();
}
else
{
AsyncSubject<T> asyncSubject = new AsyncSubject<T>();
task.ContinueWith(t => task.ToObservable().Subscribe(asyncSubject), scheduler);
return asyncSubject.AsObservable<T>();
}
}
(使用 synchronous TaskScheduler, or a testable 之一)
我在测试使用 Observable.FromAsync<T>()
和 Observable.Switch<T>()
的 class 时遇到问题。它所做的是等待触发器 observable 产生一个值,然后它开始一个异步操作,最后在一个输出序列中重新收集所有操作的结果。它的要点是这样的:
var outputStream = triggerStream
.Select(_ => Observable
.FromAsync(token => taskProducer.DoSomethingAsync(token)))
.Switch();
我用最少的部分进行了一些健全性检查测试以了解发生了什么,这是测试结果在评论中:
class test_with_rx : nspec
{
void Given_async_task_and_switch()
{
Subject<Unit> triggerStream = null;
TaskCompletionSource<long> taskDriver = null;
ITestableObserver<long> testObserver = null;
IDisposable subscription = null;
before = () =>
{
TestScheduler scheduler = new TestScheduler();
testObserver = scheduler.CreateObserver<long>();
triggerStream = new Subject<Unit>();
taskDriver = new TaskCompletionSource<long>();
// build stream under test
IObservable<long> streamUnderTest = triggerStream
.Select(_ => Observable
.FromAsync(token => taskDriver.Task))
.Switch();
/* Also tried with this Switch() overload
IObservable<long> streamUnderTest = triggerStream
.Select(_ => taskDriver.Task)
.Switch(); */
subscription = streamUnderTest.Subscribe(testObserver);
};
context["Before trigger"] = () =>
{
it["Should not notify"] = () => testObserver.Messages.Count.Should().Be(0);
// PASSED
};
context["After trigger"] = () =>
{
before = () => triggerStream.OnNext(Unit.Default);
context["When task completes"] = () =>
{
long result = -1;
before = () =>
{
taskDriver.SetResult(result);
//taskDriver.Task.Wait(); // tried with this too
};
it["Should notify once"] = () => testObserver.Messages.Count.Should().Be(1);
// FAILED: expected 1, actual 0
it["Should notify task result"] = () => testObserver.Messages[0].Value.Value.Should().Be(result);
// FAILED: of course, index out of bound
};
};
after = () =>
{
taskDriver.TrySetCanceled();
taskDriver.Task.Dispose();
subscription.Dispose();
};
}
}
在我也用模拟完成的其他测试中,我可以看到实际上调用了传递给 FromAsync 的 Func(例如 taskProducer.DoSomethingAsync(token)
),但随后看起来没有其他内容,并且输出流不产生价值。
我也尝试在达到预期之前插入一些 Task.Delay(x).Wait()
或一些 taskDriver.Task.Wait()
,但没有成功。
我读过 this SO thread 并且知道调度程序,但乍一看我认为我不需要它们,没有 ObserveOn()
被使用。我错了吗?我错过了什么? TA
为了完整起见,测试框架是 NSpec,断言库是 FluentAssertions。
您遇到的是一起测试 Rx 和 TPL 的情况。
可以找到详尽的解释
基本上你的代码工作正常,但你的测试不是。
Observable.FromAsync
将转换为所提供任务的 ContinueWith
,该任务将在任务池中执行,因此是异步的。
修复 测试的多种方法:(从难看到复杂)
结果集后休眠(注意等待不起作用,因为 Wait 不等待继续)
taskDriver.SetResult(result); Thread.Sleep(50);
在执行之前设置结果
FromAsync
(因为如果任务完成,FromAsync 将return 一个立即的 IObservable,也就是跳过ContinueWith
)taskDriver.SetResult(result); triggerStream.OnNext(Unit.Default);
将 FromAsync 替换为可测试的替代方案,例如
public static IObservable<T> ToObservable<T>(Task<T> task, TaskScheduler scheduler) { if (task.IsCompleted) { return task.ToObservable(); } else { AsyncSubject<T> asyncSubject = new AsyncSubject<T>(); task.ContinueWith(t => task.ToObservable().Subscribe(asyncSubject), scheduler); return asyncSubject.AsObservable<T>(); } }
(使用 synchronous TaskScheduler, or a testable 之一)