错误结束可观察流的正确方法

Correct way to end an observable stream in error

我有几个 return IObservable 的方法。在所有情况下,我都会设置一个查询,这将导致 returned 可观察对象完成。通常我一直在使用 TakeUntil 扩展方法。我在 TakeUntil 中使用的可观察类型包含一个标志,告诉我是否存在问题。我如何使用它来导致我的 returned observable 以错误结束?我希望允许可观察对象以错误结束的 TakeUntil 重载。

目前我已经将方法修改为 return 一个主题,该主题订阅了查询可观察对象并且还订阅了我在 TakeUntil 中使用的其他可观察对象以调用 OnCompleted 或 OnError。我意识到这是一个糟糕的计划,但我应该怎么做?非常感谢任何帮助。

TakeUntil不同,您必须从三种情况中选择一种,对应OnNextOnErrorOnCompleteted。并且已经有一个内置类型可以在 Notification<T> 中捕获它。

我们需要做的就是将具体通知转换为隐式通知 - 使用 Dematerialize 运算符。

这是一个示例流,如果值 10 出现在流中则抛出,但如果任何值大于等于 9 则完成它。

        var errorAt10 =
            values.Select(value =>
            {
                if (value == 10)
                    return Notification.CreateOnError<long>(new Exception());

                if (value >= 9)
                    return Notification.CreateOnCompleted<long>();

                return Notification.CreateOnNext(value);
            })
            .Dematerialize();

如果需要,我们可以简化它:

    public static IObservable<T> NotifyAs<T>(this IObservable<T> source, Func<T, NotificationKind> choice, Exception exception = default)
    {
        return source.Select(value =>
        {
            switch (choice(value))
            {
                case NotificationKind.OnError:
                    return Notification.CreateOnError<T>(exception ?? new Exception());
                case NotificationKind.OnCompleted:
                    return Notification.CreateOnCompleted<T>();
                default:
                    return Notification.CreateOnNext(value);
            }

        })
        .Dematerialize();
    }

现在您可以将前面的例子重写为:

        var errorAt10 =
            values.NotifyAs(value =>
                value == 10 ? NotificationKind.OnError :
                value >= 9 ? NotificationKind.OnCompleted :
                NotificationKind.OnNext
            );

您真的不需要做任何事情。您要求的内容已经内置。

如果您以此代码开始:

var source = Observable.Interval(TimeSpan.FromSeconds(1.0));
var ender = new Subject<Unit>();
var query = source.TakeUntil(ender);
query.Subscribe(x => Console.WriteLine(x));

然后你只需要为 source observable 调用 ender.OnError(new Exception("My exception")); 以异常结束错误 new Exception("My exception").