取消订阅后,我如何等待一切都在 Rx 可观察序列中完成?

How can I await that everything is done in a Rx observable sequence after unsubscribe?

简介

在我的 WPF C# .NET 应用程序中,我使用响应式扩展 (Rx) 来订阅事件,并且我经常需要从数据库中重新加载某些内容以获取更新 UI 所需的值,因为事件对象通常只包含 ID 和一些元数据。

我使用 Rx 调度在后台加载数据并在调度程序上更新 UI。我在 Rx 序列中混合 "Task.Run" 有过一些糟糕的经历(当使用 "SelectMany" 时,顺序不再得到保证,并且很难控制单元测试中的调度)。另请参阅:在反应式管道中执行 TPL 代码并通过测试调度程序控制执行

我的问题

如果我关闭我的应用程序(或关闭选项卡),我想取消订阅然后等待数据库调用(从 Rx "Select" 调用),在 "subscription.Dispose"。直到现在我还没有找到任何好的实用程序或简单的方法来做到这一点。

问题

是否有任何框架支持在 Rx 链中等待 一切 仍然 运行?

如果没有,您对如何制作一个易于使用的实用程序有什么好的想法吗?

有什么好的替代方法可以达到同样的目的吗?

例子

public async Task AwaitEverythingInARxChain()
{
    // In real life this is a hot observable event sequence which never completes
    IObservable<int> eventSource = Enumerable.Range(1, int.MaxValue).ToObservable();

    IDisposable subscription = eventSource
        // Load data in the background
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))

        // Update UI on the dispatcher
        .ObserveOn(DispatcherScheduler.Current)
        .SubscribeOn(Scheduler.Default) // In real life the source produces the event values on a background thread.
        .Subscribe(loadedData => UpdateUi(loadedData));

    Thread.Sleep(TimeSpan.FromSeconds(10));
// In real life I want to cancel (unsubscribe) here because the user has closed the Application or closed the tab and return a task which completes when everything is done.

    // Unsubscribe just guarantees that no "OnNext" is called anymore, but it doesn't wait until all operations in the sequence are finished (for example "LoadFromDatabase(id)" can still be runnig here.
    subscription.Dispose();

    await ?; // I need to await here, so that i can be sure that no "LoadFromDatabase(id)" is running anymore.

    ShutDownDatabase();
}

我已经尝试过(但没有成功)

更新:带有控制台输出和 TakeUntil

的示例
public async Task Main()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5.0))
        .Subscribe(x =>
        {
            Console.WriteLine("Cancel started");
            _shuttingDown.OnNext(Unit.Default);
        });

    await AwaitEverythingInARxChain();
    Console.WriteLine("Cancel finished");
    ShutDownDatabase();
    Thread.Sleep(TimeSpan.FromSeconds(3));
}

private Subject<Unit> _shuttingDown = new Subject<Unit>();

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Observable.Range(0, 10);

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(Scheduler.Default)
        .TakeUntil(_shuttingDown)
        .Do(loadedData => UpdateUi(loadedData));
}

public int LoadFromDatabase(int x)
{
    Console.WriteLine("Start LoadFromDatabase: " + x);
    Thread.Sleep(1000);
    Console.WriteLine("Finished LoadFromDatabase: " + x);

    return x;
}

public void UpdateUi(int x)
{
    Console.WriteLine("UpdateUi: " + x);
}

public void ShutDownDatabase()
{
    Console.WriteLine("ShutDownDatabase");
}

输出(实际):

Start LoadFromDatabase: 0
Finished LoadFromDatabase: 0
Start LoadFromDatabase: 1
UpdateUi: 0
Finished LoadFromDatabase: 1
Start LoadFromDatabase: 2
UpdateUi: 1
Finished LoadFromDatabase: 2
Start LoadFromDatabase: 3
UpdateUi: 2
Finished LoadFromDatabase: 3
Start LoadFromDatabase: 4
UpdateUi: 3
Cancel started
Cancel finished
ShutDownDatabase
Finished LoadFromDatabase: 4
Start LoadFromDatabase: 5
Finished LoadFromDatabase: 5
Start LoadFromDatabase: 6
Finished LoadFromDatabase: 6
Start LoadFromDatabase: 7

预计: 我想保证以下是最后的输出:

Cancel finished
ShutDownDatabase

你需要等待的东西。您不能等待订阅处理。最简单的方法是将处理逻辑变成可观察对象本身的一部分:

var observable = eventSource
    // Load data in the background
    .ObserveOn(Scheduler.Default)
    .Select(id => LoadFromDatabase(id))
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(10))) //This replaces your Thread.Sleep call
    .Publish()
    .RefCount();

var subscription = observable.ObserveOn(DispatcherScheduler.Current)
    .Subscribe(loadedData => UpdateUi(loadedData));

//do whatever you want here.

await observable.LastOrDefault();

这比您想象的要容易。您可以 await 观察值。所以只需这样做:

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Enumerable.Range(1, 10).ToObservable();

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Do(loadedData => UpdateUi(loadedData), () => ShutDownDatabase());
}

在您的方法中使用一些 Console.WriteLine 操作,并在 db 调用中休眠一个小线程以模拟网络延迟,我得到以下输出:

LoadFromDatabase: 1
LoadFromDatabase: 2
UpdateUi: 1
LoadFromDatabase: 3
UpdateUi: 2
LoadFromDatabase: 4
UpdateUi: 3
LoadFromDatabase: 5
UpdateUi: 4
LoadFromDatabase: 6
UpdateUi: 5
LoadFromDatabase: 7
UpdateUi: 6
LoadFromDatabase: 8
UpdateUi: 7
LoadFromDatabase: 9
UpdateUi: 8
LoadFromDatabase: 10
UpdateUi: 9
UpdateUi: 10
ShutDownDatabase

如果需要结束查询,只需创建一个shuttingDown主题:

private Subject<Unit> _shuttingDown = new Subject<Unit>();

...然后像这样修改查询:

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Do(
            loadedData => UpdateUi(loadedData),
            () => ShutDownDatabase())
        .TakeUntil(_shuttingDown);

您只需发出 _shuttingDown.OnNext(Unit.Default); 即可取消订阅可观察对象。


这是我完整的工作测试代码:

async Task Main()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5.0))
        .Subscribe(x => _shuttingDown.OnNext(Unit.Default));

    await AwaitEverythingInARxChain();
}

private Subject<Unit> _shuttingDown = new Subject<Unit>();

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Observable.Range(0, 10);

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(DispatcherScheduler.Current)
        .Finally(() => ShutDownDatabase())
        .TakeUntil(_shuttingDown)
        .Do(loadedData => UpdateUi(loadedData));
}

public int LoadFromDatabase(int x)
{
    Console.WriteLine("LoadFromDatabase: " + x);
    Thread.Sleep(1000);
    return x;
}

public void UpdateUi(int x)
{
    Console.WriteLine("UpdateUi: " + x);
}

public void ShutDownDatabase()
{
    Console.WriteLine("ShutDownDatabase");
}

我得到这个输出:

LoadFromDatabase: 0
LoadFromDatabase: 1
UpdateUi: 0
LoadFromDatabase: 2
UpdateUi: 1
LoadFromDatabase: 3
UpdateUi: 2
LoadFromDatabase: 4
UpdateUi: 3
LoadFromDatabase: 5
UpdateUi: 4
ShutDownDatabase

请注意,可观察对象试图在 10 秒内生成 10 个值,但它被 OnNext 缩短了。

我终于自己找到了解决办法。 您可以使用 TakeWhile 来实现它。 TakeUntil 不起作用,因为当第二个可观察序列产生第一个值时,主要可观察序列立即完成。

下面是工作解决方案的示例:

     public async Task Main_Solution()
    {
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        Observable
            .Timer(TimeSpan.FromSeconds(4))
            .Subscribe(x =>
            {
                Console.WriteLine("Cancel startedthread='{0}'", Thread.CurrentThread.ManagedThreadId);
                cancellationTokenSource.Cancel();
            });

        await AwaitEverythingInARxChain(cancellationTokenSource.Token);
        Console.WriteLine("Cancel finished thread='{0}'", Thread.CurrentThread.ManagedThreadId);
        ShutDownDatabase();
        Thread.Sleep(TimeSpan.FromSeconds(10));
    }

    public async Task AwaitEverythingInARxChain(CancellationToken token)
    {
        IObservable<int> eventSource = Observable.Range(0, 10);

        await eventSource
            .ObserveOn(Scheduler.Default)
            .Select(id => LoadFromDatabase(id))
            .TakeWhile(_ => !token.IsCancellationRequested)
            .ObserveOn(Scheduler.Default) // Dispatcher in real life
            .Do(loadedData => UpdateUi(loadedData)).LastOrDefaultAsync();
    }

    public int LoadFromDatabase(int x)
    {
        Console.WriteLine("Start LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(TimeSpan.FromSeconds(3));
        Console.WriteLine("Finished LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);

        return x;
    }

    public void UpdateUi(int x)
    {
        Console.WriteLine("UpdateUi: '{0}' thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
    }

    public void ShutDownDatabase()
    {
        Console.WriteLine("ShutDownDatabase thread='{0}'", Thread.CurrentThread.ManagedThreadId);
    }

并且输出:

Start LoadFromDatabase: 0 thread='9'
Finished LoadFromDatabase: 0 thread='9'
Start LoadFromDatabase: 1 thread='9'
UpdateUi: '0' thread='10'
Cancel startedthread='4'
Finished LoadFromDatabase: 1 thread='9'
Cancel finished thread='10'
ShutDownDatabase thread='10'

请注意,"ShutDownDatabase" 是最后一个输出(正如预期的那样)。它一直等到 "LoadFromDatabase" 完成第二个值,即使它的生成值没有被进一步处理。这正是我想要的。