在单独的 IObservable<Exception> 中吐出 IObservable<T> 异常并正常继续
Spit IObservable<T> exceptions in a separate IObservable<Exception> and continue normally
我有一个热 IObservable<T>
可能会抛出异常。但是,我想继续这样做。我想我可以为此使用 Retry
运算符。但是,如果我也可以通过单独的 IObservable<Exception>
监听 IObservable<T>
中的任何错误,那就太好了。可能吗?
你的情况大大简化了,因为你有一个热观察。
OnError
是您的价值流之外的通知,因此我们可以 具体化 通知以检索错误。这仍然会导致带有 OnCompleted
的流被拆除,因此您需要使用 Repeat
.
重新订阅
var exceptions =
source
.Materialize()
.Where(notif => notif.Kind == NotificationKind.OnError)
.Select(notif => notif.Exception)
.Repeat();
备注
如果您使用 Subject<T>
作为您的热门观察对象,您可能 运行 会陷入重新订阅主题的常见问题。主题将为每个新观察者重播其 OnError
或 OnCompleted
通知。
var source = new Subject<int>();
source.OnNext(1);
source.OnError(new Exception());
source.Subscribe(
i => Console.WriteLine(i),
ex => Console.WriteLine("Still got exception after the throw")
);
在这种情况下,您的异常流将进入无限重新订阅循环。
你的提问前提违反了the observable contract:
An Observable may make zero or more OnNext notifications, each representing a single emitted item, and it may then follow those emission notifications by either an OnCompleted or an OnError notification, but not both. Upon issuing an OnCompleted or OnError notification, it may not thereafter issue any further notifications. (emphasis mine)
换句话说,在你的热 IObservable<T>
抛出异常之后,observable 就结束了。由此产生的可观察异常的最大计数为 1。
如果您想支持在异常后重新启动可观察对象的场景,您正在生成可观察对象流,或 IObservable<IObservable<T>>
。为此,这里有一个代码示例:
var source = new Subject<Subject<int>>();
var exceptionStream = source
.SelectMany(o => o.Materialize())
.Where(n => n.Kind == NotificationKind.OnError)
.Select(n => n.Exception);
var itemStream = source
.SelectMany(o => o.Materialize())
.Where(n => n.Kind == NotificationKind.OnNext)
.Select(n => n.Value);
var items = new List<int>();
var exceptions = new List<Exception>();
itemStream.Subscribe(i => items.Add(i));
exceptionStream.Subscribe(e => exceptions.Add(e));
var currentSubject = new Subject<int>();
source.OnNext(currentSubject);
currentSubject.OnNext(1);
currentSubject.OnNext(2);
currentSubject.OnNext(3);
currentSubject.OnError(new Exception("First error"));
var currentSubject2 = new Subject<int>();
source.OnNext(currentSubject2);
currentSubject2.OnNext(4);
currentSubject2.OnNext(5);
currentSubject2.OnNext(6);
currentSubject2.OnError(new Exception("Second error"));
items.Dump(); //Linqpad
exceptions.Dump(); //Linqpad
我有一个热 IObservable<T>
可能会抛出异常。但是,我想继续这样做。我想我可以为此使用 Retry
运算符。但是,如果我也可以通过单独的 IObservable<Exception>
监听 IObservable<T>
中的任何错误,那就太好了。可能吗?
你的情况大大简化了,因为你有一个热观察。
OnError
是您的价值流之外的通知,因此我们可以 具体化 通知以检索错误。这仍然会导致带有 OnCompleted
的流被拆除,因此您需要使用 Repeat
.
var exceptions =
source
.Materialize()
.Where(notif => notif.Kind == NotificationKind.OnError)
.Select(notif => notif.Exception)
.Repeat();
备注
如果您使用 Subject<T>
作为您的热门观察对象,您可能 运行 会陷入重新订阅主题的常见问题。主题将为每个新观察者重播其 OnError
或 OnCompleted
通知。
var source = new Subject<int>();
source.OnNext(1);
source.OnError(new Exception());
source.Subscribe(
i => Console.WriteLine(i),
ex => Console.WriteLine("Still got exception after the throw")
);
在这种情况下,您的异常流将进入无限重新订阅循环。
你的提问前提违反了the observable contract:
An Observable may make zero or more OnNext notifications, each representing a single emitted item, and it may then follow those emission notifications by either an OnCompleted or an OnError notification, but not both. Upon issuing an OnCompleted or OnError notification, it may not thereafter issue any further notifications. (emphasis mine)
换句话说,在你的热 IObservable<T>
抛出异常之后,observable 就结束了。由此产生的可观察异常的最大计数为 1。
如果您想支持在异常后重新启动可观察对象的场景,您正在生成可观察对象流,或 IObservable<IObservable<T>>
。为此,这里有一个代码示例:
var source = new Subject<Subject<int>>();
var exceptionStream = source
.SelectMany(o => o.Materialize())
.Where(n => n.Kind == NotificationKind.OnError)
.Select(n => n.Exception);
var itemStream = source
.SelectMany(o => o.Materialize())
.Where(n => n.Kind == NotificationKind.OnNext)
.Select(n => n.Value);
var items = new List<int>();
var exceptions = new List<Exception>();
itemStream.Subscribe(i => items.Add(i));
exceptionStream.Subscribe(e => exceptions.Add(e));
var currentSubject = new Subject<int>();
source.OnNext(currentSubject);
currentSubject.OnNext(1);
currentSubject.OnNext(2);
currentSubject.OnNext(3);
currentSubject.OnError(new Exception("First error"));
var currentSubject2 = new Subject<int>();
source.OnNext(currentSubject2);
currentSubject2.OnNext(4);
currentSubject2.OnNext(5);
currentSubject2.OnNext(6);
currentSubject2.OnError(new Exception("Second error"));
items.Dump(); //Linqpad
exceptions.Dump(); //Linqpad