可观察管道中的异常处理
Exception handling in observable pipeline
我创建了一个 Observable,它由一个项目组成,该项目通过 运行 异步方法转换为另一个项目。
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b))
.SelectMany(b => VerifyAssignment(b))
.SelectMany(b => ConfirmAssignmentData(b))
.SelectMany(b => UploadAsset(b))
.Select(assignment => new Summary())
.Catch(LogException());
我想做这个防故障,所以万一在处理过程中抛出异常,我应该记录异常,但忽略异常并继续下一次扫描([=推送的下一个项目) 11=])
当前代码捕获任何异常,但一旦抛出异常,序列就结束了。
我怎样才能让它“吞下”异常(记录它),但继续下一个项目?
这个问题假设存在一个根本性的误解:根据 Observable 契约,一个行为良好的 Observable 在 OnError
通知后终止。对于您的情况,没有“仅登录并继续”选项,因为没有什么可以继续的。通过 OnError
抛出异常的 observable 已经完成,kaput,finito,永远消失了。
一条评论提到了 Retry
,这可能适用:如果你有一个像这样的可观察管道:
someHotSource
.SelectMany(e => f(e)) //operator1
.SelectMany(e => g(e)) //operator2
.Subscribe(e => {});
那么其中一个运算符可能会发生异常,从而终止管道,但源可能仍然存在。 Retry
然后将尝试重新创建具有相同功能的新管道。
您可以尝试使用 Materialize
和 Dematerialize
来 'cheat' Observable 合约,但您会逆流而上。作弊的诀窍是确保管道的任何部分都看不到 'raw' OnError
,因为该运算符将终止。而是 Materialize
将 OnError
变成 Notification
,它不会爆炸。看起来像这样:
给定这样一个行为良好的管道:
var someHotSource = new Subject<int>();
var f = new Func<int, IObservable<int>>(i => Observable.Return(i));
var g = new Func<int, IObservable<int>>(i =>
{
if(i % 13 == 0)
return Observable.Throw<int>(new Exception());
return Observable.Return(i);
});
var LogException = new Action<Exception>(e => Console.WriteLine("Exception"));
var p1 = someHotSource
.SelectMany(e => f(e)) //operator1
.SelectMany(e => g(e)) //operator2: suspect
.Subscribe(e => Console.WriteLine(e));
...你可以这样作弊:
var p2 = someHotSource
.SelectMany(e => f(e)) //operator1
.SuspectSelectMany(e => g(e), LogException) //operator2: suspect
.Subscribe(e => Console.WriteLine(e));
public static class X
{
public static IObservable<Notification<T>> IgnoreOnCompleted<T>(this IObservable<Notification<T>> source)
{
return source
.SelectMany(n => n.Kind == NotificationKind.OnCompleted
? Observable.Empty<Notification<T>>()
: Observable.Return(n)
);
}
public static IObservable<U> SuspectSelectMany<T, U>(this IObservable<T> source, Func<T, IObservable<U>> selector, Action<Exception> handler)
{
var x = source
.Materialize()
.SelectMany(e => selector(e.Value).Materialize().IgnoreOnCompleted()) //execute suspect selector, turn immediately into notifications
.SelectMany(e =>
{
if (e.Kind == NotificationKind.OnError)
{
handler(e.Exception);
return Observable.Empty<Notification<U>>();
}
else
return Observable.Return(e);
}) //error logging/suppression
.Dematerialize();
return x;
}
}
然后给出如下运行器代码:
someHotSource.OnNext(1);
someHotSource.OnNext(12);
someHotSource.OnNext(13);
someHotSource.OnNext(15);
p1
会炸。 p2
将产生以下输出:
1
12
Exception
15
Rx 是一种函数式范式,因此使用函数式方法来解决这个问题非常有用。
答案是引入另一个可以处理错误的 monad,例如 Nullable<T>
可以处理具有空值的整数,但在这种情况下 class 可以表示一个值或一个例外。
public class Exceptional
{
public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}
public class Exceptional<T>
{
public bool HasException { get; private set; }
public Exception Exception { get; private set; }
public T Value { get; private set; }
public Exceptional(T value)
{
this.HasException = false;
this.Value = value;
}
public Exceptional(Exception exception)
{
this.HasException = true;
this.Exception = exception;
}
public Exceptional(Func<T> factory)
{
try
{
this.Value = factory();
this.HasException = false;
}
catch (Exception ex)
{
this.Exception = ex;
this.HasException = true;
}
}
public override string ToString() =>
this.HasException
? this.Exception.GetType().Name
: (this.Value != null ? this.Value.ToString() : "null");
}
public static class ExceptionalExtensions
{
public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);
public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);
public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
value.SelectMany(t => Exceptional.From(() => m(t)));
public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);
public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
}
那么,让我们从创建一个抛出异常的 Rx 查询开始。
IObservable<int> query =
Observable
.Range(0, 10)
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
如果我 运行 observable 我得到这个:
让我们用with Exceptional
转换它,看看它如何让我们在发生错误时继续处理。
IObservable<Exceptional<int>> query =
Observable
.Range(0, 10)
.Select(x => x.ToExceptional())
.Select(x => x.Select(y => 5 - y))
.Select(x => x.Select(y => 100 / y))
.Select(x => x.Select(y => y + 5));
现在当我 运行 它时,我得到了这个:
现在我可以测试每个结果,看看 HasException
是否为 true
并记录每个异常,同时观察继续。
最后,通过引入一种进一步的扩展方法,很容易清理查询,使其看起来与原始查询几乎相同。
public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));
这将 observables 和 exceptionals 组合到一个 Select
运算符中。
现在查询可以如下所示:
IObservable<Exceptional<int>> query =
Observable
.Range(0, 10)
.Select(x => x.ToExceptional())
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
我得到了与之前相同的结果。
最后,我可以通过添加另外两个扩展方法来使用查询语法来实现这一切:
public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
source.Select(t => k(t));
public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
这允许:
IObservable<Exceptional<int>> query =
from n in Observable.Range(0, 10)
from x in n.ToExceptional()
let a = 5 - x
let b = 100 / a
select b + 5;
再次,我得到了与以前相同的结果。
您可以使用下面的特定于应用程序的运算符 LogAndIgnoreError
:
/// <summary>Ensures that the source sequence will always complete successfully.
/// In case of failure the error is logged.</summary>
public static IObservable<T> LogAndIgnoreError<T>(this IObservable<T> source)
{
return source.Catch((Exception error) =>
{
// Application-specific logging
Console.WriteLine($"Log - {error.GetType().Name}: {error.Message}");
return Observable.Empty<T>();
});
}
然后您可以将此运算符附加到任何您想忽略其错误的序列。
用法示例:
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b).LogAndIgnoreError())
.SelectMany(b => VerifyAssignment(b).LogAndIgnoreError())
.SelectMany(b => ConfirmAssignmentData(b).LogAndIgnoreError())
.SelectMany(b => UploadAsset(b).LogAndIgnoreError())
.Select(assignment => new Summary())
.LogAndIgnoreError();
我创建了一个 Observable,它由一个项目组成,该项目通过 运行 异步方法转换为另一个项目。
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b))
.SelectMany(b => VerifyAssignment(b))
.SelectMany(b => ConfirmAssignmentData(b))
.SelectMany(b => UploadAsset(b))
.Select(assignment => new Summary())
.Catch(LogException());
我想做这个防故障,所以万一在处理过程中抛出异常,我应该记录异常,但忽略异常并继续下一次扫描([=推送的下一个项目) 11=])
当前代码捕获任何异常,但一旦抛出异常,序列就结束了。
我怎样才能让它“吞下”异常(记录它),但继续下一个项目?
这个问题假设存在一个根本性的误解:根据 Observable 契约,一个行为良好的 Observable 在 OnError
通知后终止。对于您的情况,没有“仅登录并继续”选项,因为没有什么可以继续的。通过 OnError
抛出异常的 observable 已经完成,kaput,finito,永远消失了。
一条评论提到了 Retry
,这可能适用:如果你有一个像这样的可观察管道:
someHotSource
.SelectMany(e => f(e)) //operator1
.SelectMany(e => g(e)) //operator2
.Subscribe(e => {});
那么其中一个运算符可能会发生异常,从而终止管道,但源可能仍然存在。 Retry
然后将尝试重新创建具有相同功能的新管道。
您可以尝试使用 Materialize
和 Dematerialize
来 'cheat' Observable 合约,但您会逆流而上。作弊的诀窍是确保管道的任何部分都看不到 'raw' OnError
,因为该运算符将终止。而是 Materialize
将 OnError
变成 Notification
,它不会爆炸。看起来像这样:
给定这样一个行为良好的管道:
var someHotSource = new Subject<int>();
var f = new Func<int, IObservable<int>>(i => Observable.Return(i));
var g = new Func<int, IObservable<int>>(i =>
{
if(i % 13 == 0)
return Observable.Throw<int>(new Exception());
return Observable.Return(i);
});
var LogException = new Action<Exception>(e => Console.WriteLine("Exception"));
var p1 = someHotSource
.SelectMany(e => f(e)) //operator1
.SelectMany(e => g(e)) //operator2: suspect
.Subscribe(e => Console.WriteLine(e));
...你可以这样作弊:
var p2 = someHotSource
.SelectMany(e => f(e)) //operator1
.SuspectSelectMany(e => g(e), LogException) //operator2: suspect
.Subscribe(e => Console.WriteLine(e));
public static class X
{
public static IObservable<Notification<T>> IgnoreOnCompleted<T>(this IObservable<Notification<T>> source)
{
return source
.SelectMany(n => n.Kind == NotificationKind.OnCompleted
? Observable.Empty<Notification<T>>()
: Observable.Return(n)
);
}
public static IObservable<U> SuspectSelectMany<T, U>(this IObservable<T> source, Func<T, IObservable<U>> selector, Action<Exception> handler)
{
var x = source
.Materialize()
.SelectMany(e => selector(e.Value).Materialize().IgnoreOnCompleted()) //execute suspect selector, turn immediately into notifications
.SelectMany(e =>
{
if (e.Kind == NotificationKind.OnError)
{
handler(e.Exception);
return Observable.Empty<Notification<U>>();
}
else
return Observable.Return(e);
}) //error logging/suppression
.Dematerialize();
return x;
}
}
然后给出如下运行器代码:
someHotSource.OnNext(1);
someHotSource.OnNext(12);
someHotSource.OnNext(13);
someHotSource.OnNext(15);
p1
会炸。 p2
将产生以下输出:
1
12
Exception
15
Rx 是一种函数式范式,因此使用函数式方法来解决这个问题非常有用。
答案是引入另一个可以处理错误的 monad,例如 Nullable<T>
可以处理具有空值的整数,但在这种情况下 class 可以表示一个值或一个例外。
public class Exceptional
{
public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}
public class Exceptional<T>
{
public bool HasException { get; private set; }
public Exception Exception { get; private set; }
public T Value { get; private set; }
public Exceptional(T value)
{
this.HasException = false;
this.Value = value;
}
public Exceptional(Exception exception)
{
this.HasException = true;
this.Exception = exception;
}
public Exceptional(Func<T> factory)
{
try
{
this.Value = factory();
this.HasException = false;
}
catch (Exception ex)
{
this.Exception = ex;
this.HasException = true;
}
}
public override string ToString() =>
this.HasException
? this.Exception.GetType().Name
: (this.Value != null ? this.Value.ToString() : "null");
}
public static class ExceptionalExtensions
{
public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);
public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);
public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
value.SelectMany(t => Exceptional.From(() => m(t)));
public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);
public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
}
那么,让我们从创建一个抛出异常的 Rx 查询开始。
IObservable<int> query =
Observable
.Range(0, 10)
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
如果我 运行 observable 我得到这个:
让我们用with Exceptional
转换它,看看它如何让我们在发生错误时继续处理。
IObservable<Exceptional<int>> query =
Observable
.Range(0, 10)
.Select(x => x.ToExceptional())
.Select(x => x.Select(y => 5 - y))
.Select(x => x.Select(y => 100 / y))
.Select(x => x.Select(y => y + 5));
现在当我 运行 它时,我得到了这个:
现在我可以测试每个结果,看看 HasException
是否为 true
并记录每个异常,同时观察继续。
最后,通过引入一种进一步的扩展方法,很容易清理查询,使其看起来与原始查询几乎相同。
public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));
这将 observables 和 exceptionals 组合到一个 Select
运算符中。
现在查询可以如下所示:
IObservable<Exceptional<int>> query =
Observable
.Range(0, 10)
.Select(x => x.ToExceptional())
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
我得到了与之前相同的结果。
最后,我可以通过添加另外两个扩展方法来使用查询语法来实现这一切:
public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
source.Select(t => k(t));
public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
这允许:
IObservable<Exceptional<int>> query =
from n in Observable.Range(0, 10)
from x in n.ToExceptional()
let a = 5 - x
let b = 100 / a
select b + 5;
再次,我得到了与以前相同的结果。
您可以使用下面的特定于应用程序的运算符 LogAndIgnoreError
:
/// <summary>Ensures that the source sequence will always complete successfully.
/// In case of failure the error is logged.</summary>
public static IObservable<T> LogAndIgnoreError<T>(this IObservable<T> source)
{
return source.Catch((Exception error) =>
{
// Application-specific logging
Console.WriteLine($"Log - {error.GetType().Name}: {error.Message}");
return Observable.Empty<T>();
});
}
然后您可以将此运算符附加到任何您想忽略其错误的序列。
用法示例:
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b).LogAndIgnoreError())
.SelectMany(b => VerifyAssignment(b).LogAndIgnoreError())
.SelectMany(b => ConfirmAssignmentData(b).LogAndIgnoreError())
.SelectMany(b => UploadAsset(b).LogAndIgnoreError())
.Select(assignment => new Summary())
.LogAndIgnoreError();