可观察管道中的异常处理

Exception handling in observable pipeline

我创建了一个 Observable,它由一个项目组成,该项目通过 运行 异步方法转换为另一个项目。

IObservable<Summary> obs = scanner.Scans
                    .SelectMany(b => GetAssignment(b))
                    .SelectMany(b => VerifyAssignment(b))
                    .SelectMany(b => ConfirmAssignmentData(b))
                    .SelectMany(b => UploadAsset(b))
                    .Select(assignment => new Summary())
                    .Catch(LogException());

我想做这个防故障,所以万一在处理过程中抛出异常,我应该记录异常,但忽略异常并继续下一次扫描([=推送的下一个项目) 11=])

当前代码捕获任何异常,但一旦抛出异常,序列就结束了。

我怎样才能让它“吞下”异常(记录它),但继续下一个项目?

这个问题假设存在一个根本性的误解:根据 Observable 契约,一个行为良好的 Observable 在 OnError 通知后终止。对于您的情况,没有“仅登录并继续”选项,因为没有什么可以继续的。通过 OnError 抛出异常的 observable 已经完成,kaput,finito,永远消失了。

一条评论提到了 Retry,这可能适用:如果你有一个像这样的可观察管道:

someHotSource
    .SelectMany(e => f(e)) //operator1
    .SelectMany(e => g(e)) //operator2
    .Subscribe(e => {});

那么其中一个运算符可能会发生异常,从而终止管道,但源可能仍然存在。 Retry 然后将尝试重新创建具有相同功能的新管道。

您可以尝试使用 MaterializeDematerialize 来 'cheat' Observable 合约,但您会逆流而上。作弊的诀窍是确保管道的任何部分都看不到 'raw' OnError,因为该运算符将终止。而是 MaterializeOnError 变成 Notification,它不会爆炸。看起来像这样:

给定这样一个行为良好的管道:

var someHotSource = new Subject<int>();
var f = new Func<int, IObservable<int>>(i => Observable.Return(i));
var g = new Func<int, IObservable<int>>(i =>
{
    if(i % 13 == 0)
        return Observable.Throw<int>(new Exception());
    return Observable.Return(i);
});

var LogException = new Action<Exception>(e => Console.WriteLine("Exception"));
var p1 = someHotSource
    .SelectMany(e => f(e)) //operator1
    .SelectMany(e => g(e)) //operator2: suspect
    .Subscribe(e => Console.WriteLine(e));

...你可以这样作弊:

var p2 = someHotSource
    .SelectMany(e => f(e)) //operator1
    .SuspectSelectMany(e => g(e), LogException) //operator2: suspect
    .Subscribe(e => Console.WriteLine(e));

public static class X
{
    public static IObservable<Notification<T>> IgnoreOnCompleted<T>(this IObservable<Notification<T>> source)
    {
        return source
            .SelectMany(n => n.Kind == NotificationKind.OnCompleted
                ? Observable.Empty<Notification<T>>()
                : Observable.Return(n)
            );
    }
    
    public static IObservable<U> SuspectSelectMany<T, U>(this IObservable<T> source, Func<T, IObservable<U>> selector, Action<Exception> handler)
    {
        var x = source
            .Materialize()
            .SelectMany(e => selector(e.Value).Materialize().IgnoreOnCompleted()) //execute suspect selector, turn immediately into notifications
            .SelectMany(e =>
            {
                if (e.Kind == NotificationKind.OnError)
                {
                    handler(e.Exception);
                    return Observable.Empty<Notification<U>>();
                }
                else
                    return Observable.Return(e);
            }) //error logging/suppression
            .Dematerialize();
        return x;
    }
}

然后给出如下运行器代码:

someHotSource.OnNext(1);
someHotSource.OnNext(12);
someHotSource.OnNext(13);
someHotSource.OnNext(15);

p1会炸。 p2 将产生以下输出:

1
12
Exception
15

Rx 是一种函数式范式,因此使用函数式方法来解决这个问题非常有用。

答案是引入另一个可以处理错误的 monad,例如 Nullable<T> 可以处理具有空值的整数,但在这种情况下 class 可以表示一个值或一个例外。

public class Exceptional
{
    public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
    public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
    public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}

public class Exceptional<T>
{
    public bool HasException { get; private set; }
    public Exception Exception { get; private set; }
    public T Value { get; private set; }

    public Exceptional(T value)
    {
        this.HasException = false;
        this.Value = value;
    }

    public Exceptional(Exception exception)
    {
        this.HasException = true;
        this.Exception = exception;
    }

    public Exceptional(Func<T> factory)
    {
        try
        {
            this.Value = factory();
            this.HasException = false;
        }
        catch (Exception ex)
        {
            this.Exception = ex;
            this.HasException = true;
        }
    }

    public override string ToString() =>
        this.HasException
            ? this.Exception.GetType().Name
            : (this.Value != null ? this.Value.ToString() : "null");
}


public static class ExceptionalExtensions
{
    public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);

    public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);

    public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
        value.SelectMany(t => Exceptional.From(() => m(t)));

    public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
        value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);

    public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
        value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
}

那么,让我们从创建一个抛出异常的 Rx 查询开始。

IObservable<int> query =
    Observable
        .Range(0, 10)
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

如果我 运行 observable 我得到这个:

让我们用with Exceptional 转换它,看看它如何让我们在发生错误时继续处理。

IObservable<Exceptional<int>> query =
    Observable
        .Range(0, 10)
        .Select(x => x.ToExceptional())
        .Select(x => x.Select(y => 5 - y))
        .Select(x => x.Select(y => 100 / y))
        .Select(x => x.Select(y => y + 5));

现在当我 运行 它时,我得到了这个:

现在我可以测试每个结果,看看 HasException 是否为 true 并记录每个异常,同时观察继续。

最后,通过引入一种进一步的扩展方法,很容易清理查询,使其看起来与原始查询几乎相同。

    public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
        source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));

这将 observables 和 exceptionals 组合到一个 Select 运算符中。

现在查询可以如下所示:

IObservable<Exceptional<int>> query =
    Observable
        .Range(0, 10)
        .Select(x => x.ToExceptional())
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

我得到了与之前相同的结果。


最后,我可以通过添加另外两个扩展方法来使用查询语法来实现这一切:

public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
    source.Select(t => k(t));

public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
    source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));

这允许:

IObservable<Exceptional<int>> query =
    from n in Observable.Range(0, 10)
    from x in n.ToExceptional()
    let a = 5 - x
    let b = 100 / a
    select b + 5;

再次,我得到了与以前相同的结果。

您可以使用下面的特定于应用程序的运算符 LogAndIgnoreError

/// <summary>Ensures that the source sequence will always complete successfully.
/// In case of failure the error is logged.</summary>
public static IObservable<T> LogAndIgnoreError<T>(this IObservable<T> source)
{
    return source.Catch((Exception error) =>
    {
        // Application-specific logging
        Console.WriteLine($"Log - {error.GetType().Name}: {error.Message}");
        return Observable.Empty<T>();
    });
}

然后您可以将此运算符附加到任何您想忽略其错误的序列。

用法示例:

IObservable<Summary> obs = scanner.Scans
    .SelectMany(b => GetAssignment(b).LogAndIgnoreError())
    .SelectMany(b => VerifyAssignment(b).LogAndIgnoreError())
    .SelectMany(b => ConfirmAssignmentData(b).LogAndIgnoreError())
    .SelectMany(b => UploadAsset(b).LogAndIgnoreError())
    .Select(assignment => new Summary())
    .LogAndIgnoreError();