确保在进程终止之前完成异步 OnNext 代码

Ensuring completion of async OnNext code before process terminates

下面的单元测试永远不会打印 "Async 3",因为测试先完成。我怎样才能确保它运行完成?我能想出的最好办法是在末尾任意 Task.Delay 或 WriteAsync().Result,两者都不理想。

    public async Task TestMethod1() // eg. webjob
    {
        TestContext.WriteLine("Starting test...");
        var observable = Observable.Create<int>(async ob =>
        {
            ob.OnNext(1);
            await Task.Delay(1000); // Fake async REST api call
            ob.OnNext(2);
            await Task.Delay(1000);
            ob.OnNext(3);
            ob.OnCompleted();
        });

        observable.Subscribe(i => TestContext.WriteLine($"Sync {i}"));
        observable.SelectMany(i => WriteAsync(i).ToObservable()).Subscribe();

        await observable;
        TestContext.WriteLine("Complete.");
    }

    public async Task WriteAsync(int value) // Fake async DB call
    {
        await Task.Delay(1000);
        TestContext.WriteLine($"Async {value}");
    }

编辑 我意识到提到单元测试可能 misleading.This 不是测试问题。该代码是对 Azure WebJob 中进程 运行 的真实问题的模拟,其中生产者和消费者都需要调用一些异步 IO。问题是 webjob 在消费者真正完成之前运行完成。这是因为我无法弄清楚如何正确地等待消费者方面的任何事情。也许这对 RX 来说是不可能的...

好吧,您可以使用 Observable.ForEach 进行阻塞,直到 IObservable 终止:

observable.ForEach(unusedValue => { });

你能使 TestMethod1 成为一个普通的非异步方法,然后用这个替换 await observable; 吗?

编辑: 您基本上是在寻找阻塞操作员。旧的阻塞运算符(如 ForEach)已被弃用,取而代之的是异步版本。你想像这样等待最后一项:

public async Task TestMethod1()
{
    TestContext.WriteLine("Starting test...");
    var observable = Observable.Create<int>(async ob =>
    {
        ob.OnNext(1);
        await Task.Delay(1000);
        ob.OnNext(2);
        await Task.Delay(1000);
        ob.OnNext(3);
        ob.OnCompleted();
    });

    observable.Subscribe(i => TestContext.WriteLine($"Sync {i}"));
    var selectManyObservable = observable.SelectMany(i => WriteAsync(i).ToObservable()).Publish().RefCount();
    selectManyObservable.Subscribe();
    await selectManyObservable.LastOrDefaultAsync();
    TestContext.WriteLine("Complete.");
}

虽然这将解决您眼前的问题,但由于以下原因(我又添加了两个),您似乎要继续 运行 解决问题。 Rx 在正确使用时非常强大,而在不正确使用时会令人困惑。

旧答案:


几件事:

  1. 混合使用 async/await 和 Rx 通常会导致两者兼而有之。
  2. Rx 具有强大的测试功能。你没有使用它。
  3. 副作用,如 WriteLine 最好只在订阅中执行,而不是在像 SelectMany.
  4. 这样的运算符中执行
  5. 您可能想温习一下冷与热可观察量。
  6. 它没有 运行 完成的原因是因为你的测试 运行ner。您的测试 运行ner 将在 TestMethod1 结束时终止测试。否则 Rx 订阅将继续存在。当我在 Linqpad 中 运行 你的代码时,我得到以下输出:

    开始测试...
    同步 1
    同步 2
    异步 1
    同步 3
    异步 2
    完成。
    异步 3

...这就是我假设您想要看到的内容,除了您可能想要 Async 3 之后的 Complete。


仅使用 Rx,您的代码将如下所示:

public void TestMethod1()
{
    TestContext.WriteLine("Starting test...");
    var observable = Observable.Concat<int>(
        Observable.Return(1),
        Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
        Observable.Return(2),
        Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
        Observable.Return(3)
    );

    var syncOutput = observable
        .Select(i => $"Sync {i}");
    syncOutput.Subscribe(s => TestContext.WriteLine(s));

    var asyncOutput = observable
        .SelectMany(i => WriteAsync(i, scheduler));
    asyncOutput.Subscribe(s => TestContext.WriteLine(s), () => TestContext.WriteLine("Complete."));
}

public IObservable<string> WriteAsync(int value, IScheduler scheduler)
{
    return Observable.Return(value)
        .Delay(TimeSpan.FromSeconds(1), scheduler)
        .Select(i => $"Async {value}");
}


public static class TestContext
{
    public static void WriteLine(string s)
    {
        Console.WriteLine(s);
    }
}

这仍然没有利用 Rx 的测试功能。看起来像这样:

public void TestMethod1()
{
    var scheduler = new TestScheduler();
    TestContext.WriteLine("Starting test...");
    var observable = Observable.Concat<int>(
        Observable.Return(1),
        Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
        Observable.Return(2),
        Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
        Observable.Return(3)
    );

    var syncOutput = observable
        .Select(i => $"Sync {i}");
    syncOutput.Subscribe(s => TestContext.WriteLine(s));

    var asyncOutput = observable
        .SelectMany(i => WriteAsync(i, scheduler));
    asyncOutput.Subscribe(s => TestContext.WriteLine(s), () => TestContext.WriteLine("Complete."));

    var asyncExpected = scheduler.CreateColdObservable<string>(
        ReactiveTest.OnNext(1000.Ms(), "Async 1"),
        ReactiveTest.OnNext(2000.Ms(), "Async 2"),
        ReactiveTest.OnNext(3000.Ms(), "Async 3"),
        ReactiveTest.OnCompleted<string>(3000.Ms() + 1) //+1 because you can't have two notifications on same tick
    );

    var syncExpected = scheduler.CreateColdObservable<string>(
        ReactiveTest.OnNext(0000.Ms(), "Sync 1"),
        ReactiveTest.OnNext(1000.Ms(), "Sync 2"),
        ReactiveTest.OnNext(2000.Ms(), "Sync 3"),
        ReactiveTest.OnCompleted<string>(2000.Ms()) //why no +1 here?
    );

    var asyncObserver = scheduler.CreateObserver<string>();
    asyncOutput.Subscribe(asyncObserver);
    var syncObserver = scheduler.CreateObserver<string>();
    syncOutput.Subscribe(syncObserver);
    scheduler.Start();
    ReactiveAssert.AreElementsEqual(
        asyncExpected.Messages,
        asyncObserver.Messages);

    ReactiveAssert.AreElementsEqual(
        syncExpected.Messages,
        syncObserver.Messages);
}

public static class MyExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }

}

...所以与您的任务测试不同,您不必等待。测试立即执行。您可以将延迟时间增加到几分钟或几小时,TestScheduler 基本上会为您模拟时间。然后你的测试 运行ner 可能会很高兴。