为什么在 SelectMany 中抛出异常后我的进程仍然存在,而类似 rx 运算符中的异常导致未处理的异常?
Why is my process still alive after an exception thrown in SelectMany whereas an exception in similar rx operator results in an unhandled exception?
这是一个示例程序,它对控制台输入进行了两次订阅(源可观察性与此处无关)。在第一个订阅中,它使用 Observable.SelectMany,在第二个订阅中,它使用内部使用 System.Threading.Tasks.Dataflow 包的类似 SelectMany 运算符。每个输入中的某些输入都会抛出异常。异常被正确转发给 Observer onError,它在默认的订阅实现中重新抛出它。观察到的行为是,如果 SelectMany 中出现异常,则进程保持 运行,如果 SelectManyPreseveOrder 中出现异常,则进程因未处理的异常而终止。
不同行为的原因是什么?有没有办法在 SelectManyPreserveOrder 运算符中实现 'more friendly' 行为?
这是一个使用 Rx.Linq 2.2.5 和 System.Threading.Tasks.Dataflow 4.10.0:
的 .net 4.6.1 控制台应用程序
class Program
{
static async Task Main()
{
AppDomain.CurrentDomain.UnhandledException += (sender, args) => Console.WriteLine("App domain unhandled exception");
TaskScheduler.UnobservedTaskException += (sender, args) => Console.WriteLine("Unobserved task exception");
var consoleInput = Helper.ConsoleInput();
consoleInput.SelectMany(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "1")
throw new Exception("This exception is swallowed");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany: {s}"));
consoleInput.SelectManyPreserveOrder(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "2")
throw new Exception("This exception kills the process");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany (TPL Dataflow): {s}"));
await Task.Delay(TimeSpan.FromMinutes(10)).ConfigureAwait(false);
}
}
public static class ObservableExtension
{
public static IObservable<TResult> SelectManyPreserveOrder<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector, int maxParallelBatches = 1)
{
return source.FromTplDataflow(() =>
new TransformBlock<TSource, TResult>(selector,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelBatches }));
}
public static IObservable<TResult> FromTplDataflow<T, TResult>(
this IObservable<T> source, Func<IPropagatorBlock<T, TResult>> blockFactory)
{
return Observable.Defer(() =>
{
var block = blockFactory();
return Observable.Using(() =>
{
var disposable = source.Subscribe(block.AsObserver());
return Disposable.Create(dispose: () => disposable.Dispose());
}, r => block.AsObservable());
});
}
}
public static class Helper
{
public static IObservable<string> ConsoleInput()
{
return
Observable
.FromAsync(() => Console.In.ReadLineAsync())
.Repeat()
.Publish()
.RefCount()
.SubscribeOn(Scheduler.Default);
}
}
有趣的是,从未调用 UnobservedTaskException 处理程序。
此处抛出异常,但在 unobserved Task continuation. In .NET 4.5 and above unobserved task exceptions will be handled by the runtime automatically. Here's a good article by Stephen Toub talking about that change.
中抛出异常
重要的一点:
To make it easier for developers to write asynchronous code based on Tasks, .NET 4.5 changes the default exception behavior for unobserved exceptions. While unobserved exceptions will still cause the UnobservedTaskException event to be raised (not doing so would be a breaking change), the process will not crash by default. Rather, the exception will end up getting eaten after the event is raised, regardless of whether an event handler observes the exception.
这是一个示例程序,它对控制台输入进行了两次订阅(源可观察性与此处无关)。在第一个订阅中,它使用 Observable.SelectMany,在第二个订阅中,它使用内部使用 System.Threading.Tasks.Dataflow 包的类似 SelectMany 运算符。每个输入中的某些输入都会抛出异常。异常被正确转发给 Observer onError,它在默认的订阅实现中重新抛出它。观察到的行为是,如果 SelectMany 中出现异常,则进程保持 运行,如果 SelectManyPreseveOrder 中出现异常,则进程因未处理的异常而终止。 不同行为的原因是什么?有没有办法在 SelectManyPreserveOrder 运算符中实现 'more friendly' 行为? 这是一个使用 Rx.Linq 2.2.5 和 System.Threading.Tasks.Dataflow 4.10.0:
的 .net 4.6.1 控制台应用程序class Program
{
static async Task Main()
{
AppDomain.CurrentDomain.UnhandledException += (sender, args) => Console.WriteLine("App domain unhandled exception");
TaskScheduler.UnobservedTaskException += (sender, args) => Console.WriteLine("Unobserved task exception");
var consoleInput = Helper.ConsoleInput();
consoleInput.SelectMany(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "1")
throw new Exception("This exception is swallowed");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany: {s}"));
consoleInput.SelectManyPreserveOrder(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "2")
throw new Exception("This exception kills the process");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany (TPL Dataflow): {s}"));
await Task.Delay(TimeSpan.FromMinutes(10)).ConfigureAwait(false);
}
}
public static class ObservableExtension
{
public static IObservable<TResult> SelectManyPreserveOrder<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector, int maxParallelBatches = 1)
{
return source.FromTplDataflow(() =>
new TransformBlock<TSource, TResult>(selector,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelBatches }));
}
public static IObservable<TResult> FromTplDataflow<T, TResult>(
this IObservable<T> source, Func<IPropagatorBlock<T, TResult>> blockFactory)
{
return Observable.Defer(() =>
{
var block = blockFactory();
return Observable.Using(() =>
{
var disposable = source.Subscribe(block.AsObserver());
return Disposable.Create(dispose: () => disposable.Dispose());
}, r => block.AsObservable());
});
}
}
public static class Helper
{
public static IObservable<string> ConsoleInput()
{
return
Observable
.FromAsync(() => Console.In.ReadLineAsync())
.Repeat()
.Publish()
.RefCount()
.SubscribeOn(Scheduler.Default);
}
}
有趣的是,从未调用 UnobservedTaskException 处理程序。
此处抛出异常,但在 unobserved Task continuation. In .NET 4.5 and above unobserved task exceptions will be handled by the runtime automatically. Here's a good article by Stephen Toub talking about that change.
中抛出异常重要的一点:
To make it easier for developers to write asynchronous code based on Tasks, .NET 4.5 changes the default exception behavior for unobserved exceptions. While unobserved exceptions will still cause the UnobservedTaskException event to be raised (not doing so would be a breaking change), the process will not crash by default. Rather, the exception will end up getting eaten after the event is raised, regardless of whether an event handler observes the exception.