如何重试一个动作 OnError reactive X IObservable 几次,如果成功继续

How to retry an action OnError reactive X IObservable several times and if successful continue

我有以下问题:我有一个主题使用方法 OnNext 来通知新事件。但是,可能会发生异常,在下面的代码中使用方法调用 OnError 进行模拟。错误的原因取决于时间,所以如果我尝试再次重复相同的操作几次,它就会成功。所以我想使用 catch 方法,它接受一个生成新序列的函数来重复这个动作,比方说 5 次。如果所有重复尝试都失败,那么它应该抛出一个最终异常,如果至少有一个成功,它应该继续 OnNext(5) 和 OnNext(6)。

理想情况下,应该阻止异常和随后的 OnError 调用,但在我的情况下这是不可能的。

我尝试了多种方案,方法有 Retry、Concat、Catch 等...但没有任何效果如我所愿。

Subject<int> sub = new Subject<int>();

var seq = sub.Select(x =>
{
    //time dependent operation
    Console.WriteLine(x);
    return x;
}).
Catch((SeqException<int> ex) =>
{
    return Observable.Empty(0); // what sequece to return to achieve the desired behaviour
});
seq.Subscribe();

sub.OnNext(1);
sub.OnNext(2);
sub.OnNext(3);
sub.OnError(new SeqException<int>{ Value = 4});
sub.OnNext(5);
sub.OnNext(6);

seq.Wait();

提前致谢。

您始终可以使用重试块包装任何内容。只是一个带有一个参数的动作的例子。

Action<T> RetryAction<T>( Action<T> action, int retries )
{
    return arg => {
        int count = 0;
        while ( true )
        {
            try
            { 
                action( arg );
                return; 
            }
            catch
            { 
                if ( count == retries )
                    throw;
                count++;
            }
        }
    }
}

因此您不必更改序列处理的逻辑。

情况1:如果Select里面的时间依赖操作是你需要重试的。

使用 Observable.Start 将 thunk 转换为 observable。然后你可以使用所有恢复操作符来声明你的行为。

Subject<int> sub = new Subject<int>();      

var seq = sub.SelectMany(x =>            
        Observable.Start(() =>
        {
            //time dependent failure
            if (DateTime.Now.Second % 2 == 0)
                throw new Exception();

            Console.WriteLine(x);
            return x;
        })
        .Retry(5)
        .Catch(Observable.Return(x))
    );

//for testing
Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => (int)x).Subscribe(sub);

seq.Wait();

案例 2: 您实际上是通过 Subject 收到信号的。

Rx 协定要求在 OnComplete|OnError 之后不再发生通知。在收到该错误后,主题将处理其所有订阅并拆除管道。您需要将其带到案例 1 中才能使其正常工作。