捕获错误后如何 select 可观察对象的下一项?

How to select next item of an observable after catching an error?

我正在从 TCP 端点读取数据流。这可能会出错或出现故障,此时我想尝试连接到次要端点。

我想使用 Observable.Repeat 创建无限端点列表,然后 select 使用其中之一创建流。如果出错,我想切换到下一个。我该怎么做?

一些示例代码会有所帮助,这样回答者就不必编造东西并散布在糟糕的电影参考中...

void Main()
{
    var initialObservable = new Subject<int>();
    var observableToContinueWithAfterError = Observable.Return(10);
    var chainedObservable = initialObservable.Catch(observableToContinueWithAfterError);

    chainedObservable.Subscribe(
        i => Console.WriteLine(i), 
        e => Console.WriteLine("Too bad AC ain't in charge no more (this won't happen, Observable.Catch swallows the exception)."),
        () => Console.WriteLine("Keep the change you..."));
    initialObservable.OnNext(1);
    initialObservable.OnNext(2);
    initialObservable.OnError(new Exception());
}

有关 Observable.Catch 的更多信息,请参阅 here

有点麻烦的是在重新订阅发生时保持正在使用哪个源的状态。可能有比使用 IEnumerator 更好的替代方法,但它可能不会使用 Retry 语义。

            //sample source which throws an error after 5 entries
            Func<string, IObservable<string>> sampleSource =
                endpoint =>
                Observable
                .Interval(TimeSpan.FromSeconds(0.5))
                .Select(i => $"{endpoint} : {i + 1}")
                .Take(5)
                .Concat(Observable.Throw<string>(new Exception()));

            //infinite sequence of sources
            var endpoints = new string[] { "source1", "source2" }.Repeat();

            var sequence =
            Observable.Using
            (
                endpoints.GetEnumerator,
                enumerator => Observable.Create<string>(observer =>
                {
                    enumerator.MoveNext();
                    observer.OnNext(enumerator.Current);
                    return Disposable.Empty;
                })
                .SelectMany(sampleSource)
                .Retry()
           );

            sequence.Subscribe(c => Console.WriteLine(c));

            Console.ReadLine();

输出:

source1 : 1
source1 : 2
source1 : 3
source1 : 4
source1 : 5
source2 : 1
source2 : 2
source2 : 3
source2 : 4
source2 : 5
source1 : 1
source1 : 2