处理 IConnectableObservable 不会发送 OnComplete
Disposing a IConnectableObservable doesn't send OnComplete
我在尝试处理最坏情况时遇到了一个问题;正在处理一个 IConnectableObservable
,有订阅,在它完成之前。
我编写了一个人为的示例来重现该问题。
var hotSource = Observable
.Return(1)
.Delay(TimeSpan.FromMilliseconds(500))
.Publish();
var disposable = hotSource.Connect();
这是我的热门 Observable。我添加了一个延迟,以便我可以在它触发值之前订阅它。
下面我 return 一个可等待的可观察对象,它应该允许我等到 hotSource 完成。我相信我应该在这一点上订阅
var awaitable = hotSource
.Do((Console.WriteLine))
.Finally(() => Console.WriteLine("Complete"))
.LastOrDefaultAsync();
现在如果我立即处理然后等待我的订阅:
disposable.Dispose();
await awaitable;
它只是完全挂起应用程序并且永远不会调用 Finally
。
我希望我的 awaitable
达到 return OnCompleted
或最差 OnError
ObjectDisposedException
。此外,如果您在订阅后连接,它仍然会挂起。有什么想法吗?
[更新]
根据 supertoi 的回答(我在处置之后才订阅)我在处置之前通过显式订阅重新表述了问题。
var mutex = new SemaphoreSlim(0);
var hotSource = Observable
.Return(1)
.Delay(TimeSpan.FromMilliseconds(500))
.Publish();
var subscription = Observable.Create(
(IObserver<int> observer) =>
{
Console.WriteLine("Subscribed");
return hotSource.Subscribe(observer);
})
.Finally(() => mutex.Release())
.Subscribe(Console.WriteLine);
hotSource
.Connect()
.Dispose();
await mutex.WaitAsync();
Console.WriteLine("Complete");
这再次停止,但在它被处理之前挑衅地订阅了 IConnectableObservable
。
LastOrDefaultAsync()
returns 一个 IObservable
所以它不会让你订阅序列。
await
订阅了序列。
在 IConnectableObservable.Dispose()
上检查 the comment
Disposable used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.
这意味着停止发出所有通知,包括 OnNext
、OnError
和 OnCompleted
。
因此,如果您 await
在处理 IConnectableObservable
之后,您将不会收到任何通知。您可以Connect
再次到hotsource
接收通知。
我在尝试处理最坏情况时遇到了一个问题;正在处理一个 IConnectableObservable
,有订阅,在它完成之前。
我编写了一个人为的示例来重现该问题。
var hotSource = Observable
.Return(1)
.Delay(TimeSpan.FromMilliseconds(500))
.Publish();
var disposable = hotSource.Connect();
这是我的热门 Observable。我添加了一个延迟,以便我可以在它触发值之前订阅它。
下面我 return 一个可等待的可观察对象,它应该允许我等到 hotSource 完成。我相信我应该在这一点上订阅
var awaitable = hotSource
.Do((Console.WriteLine))
.Finally(() => Console.WriteLine("Complete"))
.LastOrDefaultAsync();
现在如果我立即处理然后等待我的订阅:
disposable.Dispose();
await awaitable;
它只是完全挂起应用程序并且永远不会调用 Finally
。
我希望我的 awaitable
达到 return OnCompleted
或最差 OnError
ObjectDisposedException
。此外,如果您在订阅后连接,它仍然会挂起。有什么想法吗?
[更新]
根据 supertoi 的回答(我在处置之后才订阅)我在处置之前通过显式订阅重新表述了问题。
var mutex = new SemaphoreSlim(0);
var hotSource = Observable
.Return(1)
.Delay(TimeSpan.FromMilliseconds(500))
.Publish();
var subscription = Observable.Create(
(IObserver<int> observer) =>
{
Console.WriteLine("Subscribed");
return hotSource.Subscribe(observer);
})
.Finally(() => mutex.Release())
.Subscribe(Console.WriteLine);
hotSource
.Connect()
.Dispose();
await mutex.WaitAsync();
Console.WriteLine("Complete");
这再次停止,但在它被处理之前挑衅地订阅了 IConnectableObservable
。
LastOrDefaultAsync()
returns 一个 IObservable
所以它不会让你订阅序列。
await
订阅了序列。
在 IConnectableObservable.Dispose()
Disposable used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.
这意味着停止发出所有通知,包括 OnNext
、OnError
和 OnCompleted
。
因此,如果您 await
在处理 IConnectableObservable
之后,您将不会收到任何通知。您可以Connect
再次到hotsource
接收通知。