使用 C# ReactiveX 将错误映射到可观察对象

Map errors to observable using C# ReactiveX

我有一个 observable MyObservable<Object> 可以抛出 CustomExceptions where

private class CustomException : Exception

我想做的是将 CustomExceptions 转换为对象并在新的可观察对象中发出它们。

到目前为止,这是我的解决方案,但我想知道是否可以在不必直接调用 Subject 的 onNext、onCompleted 或 onError 方法的情况下完成此操作。

var MySubject = new Subject<NewObject>();

MyObservable.Catch<Object, CustomException>(
            ex =>
            {
                NewObject o = new NewObject(ex.Message);
                MySubject.OnNext(o);
                return Observable.Empty<Object>();
            });

IObservable<IList<NewObject>> listObservable = MySubject.ToList();

编辑:谢谢ibebbs!工作得很好!

您可以使用 Materialize() 函数捕获和映射没有主题的异常,如下所示:

var errorObservable = source
    .Select(projection)
    .Materialize()
    .Where(notification => notification.Kind == NotificationKind.OnError)
    .Select(notification => notification.Exception)
    .OfType<CustomException>()
    .Select(exception => new NewObject(exception.Message));

Materialize 函数接受一个 IObservable<T> 并将其映射到一个 IObservable<Notification<T>>,其中每个 Notification 都有一个 Kind of OnNextOnErrorOnComplete。上面的 observable 只是简单地寻找带有 Kind`` of OnError and with the Exception being an instance of CustomException then projects these exceptions into anIObservable``` 的 Notifications。

这是一个单元测试,显示了这个工作:

[Fact]
public void ShouldEmitErrorsToObservable()
{
    Subject<int> source = new Subject<int>();
    List<int> values = new List<int>();
    List<NewObject> errors = new List<NewObject>();

    Func<int, int> projection =
        value =>
        {
            if (value % 2 == 1) throw new CustomException("Item is odd");

            return value;
        };

    Func<CustomException, IObservable<int>> catcher = null;

    catcher = ex => source.Select(projection).Catch(catcher);

    var errorObservable = source
        .Select(projection)
        .Materialize()
        .Where(notification => notification.Kind == NotificationKind.OnError)
        .Select(notification => notification.Exception)
        .OfType<CustomException>()
        .Select(exception => new NewObject(exception.Message));

    var normalSubscription = source.Select(projection).Catch(catcher).Subscribe(values.Add);
    var errorSubscription = errorObservable.Subscribe(errors.Add);

    source.OnNext(0);
    source.OnNext(1);
    source.OnNext(2);

    Assert.Equal(2, values.Count);
    Assert.Equal(1, errors.Count);
}

然而,正如您在上面使用的解释捕获机制中看到的那样,Rx 中的异常处理可能很难正确处理,甚至更难以优雅地处理。相反,请考虑 Exceptions should be Exceptional 并且,如果您期望出现 class 错误以便为它编写自定义异常,那么该错误并不是真正的异常,而是必须处理的流程的一部分这些错误。

在这种情况下,我建议将可观察对象投影到体现 "try this operation and record the result, be it a value or an exception" 的 class 中,并在执行链中进一步使用它。

在下面的示例中,我使用 "Fallible" class 来捕获操作的结果或异常,然后订阅一个 "Fallible" 实例流,将错误与值。正如您将看到的,代码更简洁且性能更好,因为错误和值共享对基础源的单个订阅:

internal class Fallible
{
    public static Fallible<TResult> Try<TResult, TException>(Func<TResult> action) where TException : Exception
    {
        try
        {
            return Success(action());
        }
        catch (TException exception)
        {
            return Error<TResult>(exception);
        }
    }

    public static Fallible<T> Success<T>(T value)
    {
        return new Fallible<T>(value);
    }

    public static Fallible<T> Error<T>(Exception exception)
    {
        return new Fallible<T>(exception);
    }
}

internal class Fallible<T>
{
    public Fallible(T value)
    {
        Value = value;
        IsSuccess = true;
    }

    public Fallible(Exception exception)
    {
        Exception = exception;
        IsError = true;
    }

    public T Value { get; private set; }
    public Exception Exception { get; private set; }
    public bool IsSuccess { get; private set; }
    public bool IsError { get; private set; }
}

[Fact]
public void ShouldMapErrorsToFallible()
{
    Subject<int> source = new Subject<int>();
    List<int> values = new List<int>();
    List<NewObject> errors = new List<NewObject>();

    Func<int, int> projection =
        value =>
        {
            if (value % 2 == 1) throw new CustomException("Item is odd");

            return value;
        };

    var observable = source
        .Select(value => Fallible.Try<int, CustomException>(() => projection(value)))
        .Publish()
        .RefCount();

    var errorSubscription = observable
        .Where(fallible => fallible.IsError)
        .Select(fallible => new NewObject(fallible.Exception.Message))
        .Subscribe(errors.Add);

    var normalSubscription = observable
        .Where(fallible => fallible.IsSuccess)
        .Select(fallible => fallible.Value)
        .Subscribe(values.Add);

    source.OnNext(0);
    source.OnNext(1);
    source.OnNext(2);

    Assert.Equal(2, values.Count);
    Assert.Equal(1, errors.Count);
}

希望对您有所帮助。