将 IObservable<bool> 和 IObservable<Exception> 合并到一个可观察到的 OnErrors 发生异常时

Merging an IObservable<bool> and IObservable<Exception> to one observable that OnErrors when there is an exception

我正在尝试创建单个 Observable,其中 OnNext 流来自一个 observable,而 OnError 流来自另一个 observable。

我这样做的原因是因为我试图包装一个我无法控制的 class 并使用事件来传达其状态。它有两个事件,一个表示已完成 (bool),另一个表示发生异常。

IObservable<Exception> error = Observable.FromEventPattern<ExceptionRoutedEventArgs>(foo, "Failed")
               .Select(x => x.EventArgs.ErrorException);

IObservable<bool> opened = Observable.FromEventPattern<RoutedEventArgs>(foo, "Opened")
               .Select(x => ((Bar)x.Sender).IsOpen);

现在我不能使用标准 Observable.Merge,因为两个可观察对象都有不同的通用参数。但是在伪代码中我想完成这个:

Observable.Merge(opened, error, (op, err) =>
{
    if(op) { return op;}
    if(err != null){return Observable.Throw(err);}
}

上面的代码与任何可能存在的代码都不相似的原因有很多,但我希望意图很明确。

我认为实现此目的的一种方法是使用 Subject<>,但我听说应该避免使用它,因为它在功能概念中引入了状态。我有一个想法,将两个可观察对象组合成一个可观察对象 OnNext 和 OnError 流似乎应该存在 :)

您可以使用 Observable.Create:

var combined = Observable.Create<bool>(o =>
{
    var openSub = opened.Subscribe(o);
    var errorSub = error.Subscribe(o.OnError);
    return new CompositeDisposable(openSub, errorSub);
});

或者您可以将每个 Exception 投影到一个只抛出错误的 IObservable<bool>,然后合并生成的序列:

var combined = opened.Merge(error.SelectMany(Observable.Throw<bool>));

尽管仅仅因为您的源使用异常来控制流程并不意味着您必须这样做 - 为什么不直接将 Exception 投影到 false,然后您就可以得到 true成功和 false 失败:

var combined = opened.Merge(error.Select(_ => false));

试试这个:

var combined =
    opened
        .Materialize()
        .Merge(error.Select(e => Notification.CreateOnError<bool>(e)))
        .Dematerialize()
        .Synchronize();