确保在进程终止之前完成异步 OnNext 代码
Ensuring completion of async OnNext code before process terminates
下面的单元测试永远不会打印 "Async 3",因为测试先完成。我怎样才能确保它运行完成?我能想出的最好办法是在末尾任意 Task.Delay 或 WriteAsync().Result,两者都不理想。
public async Task TestMethod1() // eg. webjob
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Create<int>(async ob =>
{
ob.OnNext(1);
await Task.Delay(1000); // Fake async REST api call
ob.OnNext(2);
await Task.Delay(1000);
ob.OnNext(3);
ob.OnCompleted();
});
observable.Subscribe(i => TestContext.WriteLine($"Sync {i}"));
observable.SelectMany(i => WriteAsync(i).ToObservable()).Subscribe();
await observable;
TestContext.WriteLine("Complete.");
}
public async Task WriteAsync(int value) // Fake async DB call
{
await Task.Delay(1000);
TestContext.WriteLine($"Async {value}");
}
编辑
我意识到提到单元测试可能 misleading.This 不是测试问题。该代码是对 Azure WebJob 中进程 运行 的真实问题的模拟,其中生产者和消费者都需要调用一些异步 IO。问题是 webjob 在消费者真正完成之前运行完成。这是因为我无法弄清楚如何正确地等待消费者方面的任何事情。也许这对 RX 来说是不可能的...
好吧,您可以使用 Observable.ForEach
进行阻塞,直到 IObservable
终止:
observable.ForEach(unusedValue => { });
你能使 TestMethod1
成为一个普通的非异步方法,然后用这个替换 await observable;
吗?
编辑:
您基本上是在寻找阻塞操作员。旧的阻塞运算符(如 ForEach
)已被弃用,取而代之的是异步版本。你想像这样等待最后一项:
public async Task TestMethod1()
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Create<int>(async ob =>
{
ob.OnNext(1);
await Task.Delay(1000);
ob.OnNext(2);
await Task.Delay(1000);
ob.OnNext(3);
ob.OnCompleted();
});
observable.Subscribe(i => TestContext.WriteLine($"Sync {i}"));
var selectManyObservable = observable.SelectMany(i => WriteAsync(i).ToObservable()).Publish().RefCount();
selectManyObservable.Subscribe();
await selectManyObservable.LastOrDefaultAsync();
TestContext.WriteLine("Complete.");
}
虽然这将解决您眼前的问题,但由于以下原因(我又添加了两个),您似乎要继续 运行 解决问题。 Rx 在正确使用时非常强大,而在不正确使用时会令人困惑。
旧答案:
几件事:
- 混合使用 async/await 和 Rx 通常会导致两者兼而有之。
- Rx 具有强大的测试功能。你没有使用它。
- 副作用,如
WriteLine
最好只在订阅中执行,而不是在像 SelectMany
. 这样的运算符中执行
- 您可能想温习一下冷与热可观察量。
它没有 运行 完成的原因是因为你的测试 运行ner。您的测试 运行ner 将在 TestMethod1
结束时终止测试。否则 Rx 订阅将继续存在。当我在 Linqpad 中 运行 你的代码时,我得到以下输出:
开始测试...
同步 1
同步 2
异步 1
同步 3
异步 2
完成。
异步 3
...这就是我假设您想要看到的内容,除了您可能想要 Async 3 之后的 Complete。
仅使用 Rx,您的代码将如下所示:
public void TestMethod1()
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Concat<int>(
Observable.Return(1),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
Observable.Return(2),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
Observable.Return(3)
);
var syncOutput = observable
.Select(i => $"Sync {i}");
syncOutput.Subscribe(s => TestContext.WriteLine(s));
var asyncOutput = observable
.SelectMany(i => WriteAsync(i, scheduler));
asyncOutput.Subscribe(s => TestContext.WriteLine(s), () => TestContext.WriteLine("Complete."));
}
public IObservable<string> WriteAsync(int value, IScheduler scheduler)
{
return Observable.Return(value)
.Delay(TimeSpan.FromSeconds(1), scheduler)
.Select(i => $"Async {value}");
}
public static class TestContext
{
public static void WriteLine(string s)
{
Console.WriteLine(s);
}
}
这仍然没有利用 Rx 的测试功能。看起来像这样:
public void TestMethod1()
{
var scheduler = new TestScheduler();
TestContext.WriteLine("Starting test...");
var observable = Observable.Concat<int>(
Observable.Return(1),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
Observable.Return(2),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
Observable.Return(3)
);
var syncOutput = observable
.Select(i => $"Sync {i}");
syncOutput.Subscribe(s => TestContext.WriteLine(s));
var asyncOutput = observable
.SelectMany(i => WriteAsync(i, scheduler));
asyncOutput.Subscribe(s => TestContext.WriteLine(s), () => TestContext.WriteLine("Complete."));
var asyncExpected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1000.Ms(), "Async 1"),
ReactiveTest.OnNext(2000.Ms(), "Async 2"),
ReactiveTest.OnNext(3000.Ms(), "Async 3"),
ReactiveTest.OnCompleted<string>(3000.Ms() + 1) //+1 because you can't have two notifications on same tick
);
var syncExpected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0000.Ms(), "Sync 1"),
ReactiveTest.OnNext(1000.Ms(), "Sync 2"),
ReactiveTest.OnNext(2000.Ms(), "Sync 3"),
ReactiveTest.OnCompleted<string>(2000.Ms()) //why no +1 here?
);
var asyncObserver = scheduler.CreateObserver<string>();
asyncOutput.Subscribe(asyncObserver);
var syncObserver = scheduler.CreateObserver<string>();
syncOutput.Subscribe(syncObserver);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
asyncExpected.Messages,
asyncObserver.Messages);
ReactiveAssert.AreElementsEqual(
syncExpected.Messages,
syncObserver.Messages);
}
public static class MyExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
...所以与您的任务测试不同,您不必等待。测试立即执行。您可以将延迟时间增加到几分钟或几小时,TestScheduler
基本上会为您模拟时间。然后你的测试 运行ner 可能会很高兴。
下面的单元测试永远不会打印 "Async 3",因为测试先完成。我怎样才能确保它运行完成?我能想出的最好办法是在末尾任意 Task.Delay 或 WriteAsync().Result,两者都不理想。
public async Task TestMethod1() // eg. webjob
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Create<int>(async ob =>
{
ob.OnNext(1);
await Task.Delay(1000); // Fake async REST api call
ob.OnNext(2);
await Task.Delay(1000);
ob.OnNext(3);
ob.OnCompleted();
});
observable.Subscribe(i => TestContext.WriteLine($"Sync {i}"));
observable.SelectMany(i => WriteAsync(i).ToObservable()).Subscribe();
await observable;
TestContext.WriteLine("Complete.");
}
public async Task WriteAsync(int value) // Fake async DB call
{
await Task.Delay(1000);
TestContext.WriteLine($"Async {value}");
}
编辑 我意识到提到单元测试可能 misleading.This 不是测试问题。该代码是对 Azure WebJob 中进程 运行 的真实问题的模拟,其中生产者和消费者都需要调用一些异步 IO。问题是 webjob 在消费者真正完成之前运行完成。这是因为我无法弄清楚如何正确地等待消费者方面的任何事情。也许这对 RX 来说是不可能的...
好吧,您可以使用 Observable.ForEach
进行阻塞,直到 IObservable
终止:
observable.ForEach(unusedValue => { });
你能使 TestMethod1
成为一个普通的非异步方法,然后用这个替换 await observable;
吗?
编辑:
您基本上是在寻找阻塞操作员。旧的阻塞运算符(如 ForEach
)已被弃用,取而代之的是异步版本。你想像这样等待最后一项:
public async Task TestMethod1()
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Create<int>(async ob =>
{
ob.OnNext(1);
await Task.Delay(1000);
ob.OnNext(2);
await Task.Delay(1000);
ob.OnNext(3);
ob.OnCompleted();
});
observable.Subscribe(i => TestContext.WriteLine($"Sync {i}"));
var selectManyObservable = observable.SelectMany(i => WriteAsync(i).ToObservable()).Publish().RefCount();
selectManyObservable.Subscribe();
await selectManyObservable.LastOrDefaultAsync();
TestContext.WriteLine("Complete.");
}
虽然这将解决您眼前的问题,但由于以下原因(我又添加了两个),您似乎要继续 运行 解决问题。 Rx 在正确使用时非常强大,而在不正确使用时会令人困惑。
旧答案:
几件事:
- 混合使用 async/await 和 Rx 通常会导致两者兼而有之。
- Rx 具有强大的测试功能。你没有使用它。
- 副作用,如
WriteLine
最好只在订阅中执行,而不是在像SelectMany
. 这样的运算符中执行
- 您可能想温习一下冷与热可观察量。
它没有 运行 完成的原因是因为你的测试 运行ner。您的测试 运行ner 将在
TestMethod1
结束时终止测试。否则 Rx 订阅将继续存在。当我在 Linqpad 中 运行 你的代码时,我得到以下输出:开始测试...
同步 1
同步 2
异步 1
同步 3
异步 2
完成。
异步 3
...这就是我假设您想要看到的内容,除了您可能想要 Async 3 之后的 Complete。
仅使用 Rx,您的代码将如下所示:
public void TestMethod1()
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Concat<int>(
Observable.Return(1),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
Observable.Return(2),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
Observable.Return(3)
);
var syncOutput = observable
.Select(i => $"Sync {i}");
syncOutput.Subscribe(s => TestContext.WriteLine(s));
var asyncOutput = observable
.SelectMany(i => WriteAsync(i, scheduler));
asyncOutput.Subscribe(s => TestContext.WriteLine(s), () => TestContext.WriteLine("Complete."));
}
public IObservable<string> WriteAsync(int value, IScheduler scheduler)
{
return Observable.Return(value)
.Delay(TimeSpan.FromSeconds(1), scheduler)
.Select(i => $"Async {value}");
}
public static class TestContext
{
public static void WriteLine(string s)
{
Console.WriteLine(s);
}
}
这仍然没有利用 Rx 的测试功能。看起来像这样:
public void TestMethod1()
{
var scheduler = new TestScheduler();
TestContext.WriteLine("Starting test...");
var observable = Observable.Concat<int>(
Observable.Return(1),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
Observable.Return(2),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
Observable.Return(3)
);
var syncOutput = observable
.Select(i => $"Sync {i}");
syncOutput.Subscribe(s => TestContext.WriteLine(s));
var asyncOutput = observable
.SelectMany(i => WriteAsync(i, scheduler));
asyncOutput.Subscribe(s => TestContext.WriteLine(s), () => TestContext.WriteLine("Complete."));
var asyncExpected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1000.Ms(), "Async 1"),
ReactiveTest.OnNext(2000.Ms(), "Async 2"),
ReactiveTest.OnNext(3000.Ms(), "Async 3"),
ReactiveTest.OnCompleted<string>(3000.Ms() + 1) //+1 because you can't have two notifications on same tick
);
var syncExpected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0000.Ms(), "Sync 1"),
ReactiveTest.OnNext(1000.Ms(), "Sync 2"),
ReactiveTest.OnNext(2000.Ms(), "Sync 3"),
ReactiveTest.OnCompleted<string>(2000.Ms()) //why no +1 here?
);
var asyncObserver = scheduler.CreateObserver<string>();
asyncOutput.Subscribe(asyncObserver);
var syncObserver = scheduler.CreateObserver<string>();
syncOutput.Subscribe(syncObserver);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
asyncExpected.Messages,
asyncObserver.Messages);
ReactiveAssert.AreElementsEqual(
syncExpected.Messages,
syncObserver.Messages);
}
public static class MyExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
...所以与您的任务测试不同,您不必等待。测试立即执行。您可以将延迟时间增加到几分钟或几小时,TestScheduler
基本上会为您模拟时间。然后你的测试 运行ner 可能会很高兴。