处理 IConnectableObservable 不会发送 OnComplete

Disposing a IConnectableObservable doesn't send OnComplete

我在尝试处理最坏情况时遇到了一个问题;正在处理一个 IConnectableObservable,有订阅,在它完成之前。

我编写了一个人为的示例来重现该问题。

var hotSource = Observable
    .Return(1)
    .Delay(TimeSpan.FromMilliseconds(500))
    .Publish();

var disposable = hotSource.Connect();

这是我的热门 Observable。我添加了一个延迟,以便我可以在它触发值之前订阅它。

下面我 return 一个可等待的可观察对象,它应该允许我等到 hotSource 完成。我相信我应该在这一点上订阅

var awaitable = hotSource
    .Do((Console.WriteLine))
    .Finally(() => Console.WriteLine("Complete"))
    .LastOrDefaultAsync();

现在如果我立即处理然后等待我的订阅:

disposable.Dispose();
await awaitable;

它只是完全挂起应用程序并且永远不会调用 Finally

我希望我的 awaitable 达到 return OnCompleted 或最差 OnError ObjectDisposedException。此外,如果您在订阅后连接,它仍然会挂起。有什么想法吗?

[更新]

根据 supertoi 的回答(我在处置之后才订阅)我在处置之前通过显式订阅重新表述了问题。

var mutex = new SemaphoreSlim(0);

var hotSource = Observable
    .Return(1)
    .Delay(TimeSpan.FromMilliseconds(500))
    .Publish();

var subscription = Observable.Create(
    (IObserver<int> observer) =>
    {
        Console.WriteLine("Subscribed");
        return hotSource.Subscribe(observer);
    })
    .Finally(() => mutex.Release())
    .Subscribe(Console.WriteLine);

hotSource
    .Connect()
    .Dispose();

await mutex.WaitAsync();
Console.WriteLine("Complete");

这再次停止,但在它被处理之前挑衅地订阅了 IConnectableObservable

LastOrDefaultAsync() returns 一个 IObservable 所以它不会让你订阅序列。 await 订阅了序列。

IConnectableObservable.Dispose()

上检查 the comment

Disposable used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.

这意味着停止发出所有通知,包括 OnNextOnErrorOnCompleted

因此,如果您 await 在处理 IConnectableObservable 之后,您将不会收到任何通知。您可以Connect再次到hotsource接收通知。