如何阻止 IConnectableObservable.Wait() 吞下未处理的异常?

How can I stop IConnectableObservable.Wait() from swallowing unhandled exceptions?

我有一个控制台应用程序使用 Rx.NET。

我需要阻塞直到 IConnectableObservable 完成,为此我正在使用 IConnectableObservable.Wait()

当抛出未处理的异常时,它会被吞噬并且应用程序会挂起。我希望应用程序崩溃,并将堆栈轨迹打印到控制台。

我不想向我的 IConnectableObserver 添加 OnError 处理程序,因为这会丢失原始堆栈跟踪。

我已经尝试在未发布的可观察对象上使用 .Wait() 方法,但是这种重新订阅会导致不良行为。

我试过使用 .GetAwaiter().GetResult() 代替,但这有同样的问题。

var connectable = myObservable.Publish();

connectable.Subscribe(myObserver1);
connectable.Subscribe(myObserver2);

connectable.Connect();
connectcable.Wait();

如何在保留典型的未处理异常行为的同时等待 IConnectableObservable 完成?

这里的事件链有些误导。 错误并没有被吞噬 - 远非如此,它正在被重新抛出。

通常怀疑是一些奇怪的并发和调度问题,但没有人怀疑 Subscribe 方法。

当您使用您自己的 IObserver<T> 以外的其他内容调用 Subscribe 时,您正在使用这些默认操作创建 AnonymousObserver

new AnonymousObserver<T>(Ignore, Throw, Nop)

有效

new AnonymousObserver<T>(_ => {}, exn => throw exn, () => {})

默认错误处理程序将在您正在观察的任何上下文中抛出错误。哎呀。有时它可能是 AppDomain 计时器,或在池线程上,并且由于无法处理,您的应用程序将关闭。

因此,如果我们更改示例以在虚拟处理程序中提供,

var myObservable = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(4).Concat(Observable.Throw(new Exception(), 1L));
var connectable = myObservable.Publish();

connectable.Subscribe(Console.WriteLine, exn => Console.WriteLine("Handled"));
connectable.Subscribe(Console.WriteLine, exn => Console.WriteLine("Handled"));

connectable.Connect();

try
{
    connectable.Wait();
}
catch (Exception)
{
    Console.WriteLine("An error, but I'm safe");
}

您可以像您期望的那样处理 Wait 中的错误。