如何重试一个动作 OnError reactive X IObservable 几次,如果成功继续
How to retry an action OnError reactive X IObservable several times and if successful continue
我有以下问题:我有一个主题使用方法 OnNext 来通知新事件。但是,可能会发生异常,在下面的代码中使用方法调用 OnError 进行模拟。错误的原因取决于时间,所以如果我尝试再次重复相同的操作几次,它就会成功。所以我想使用 catch 方法,它接受一个生成新序列的函数来重复这个动作,比方说 5 次。如果所有重复尝试都失败,那么它应该抛出一个最终异常,如果至少有一个成功,它应该继续 OnNext(5) 和 OnNext(6)。
理想情况下,应该阻止异常和随后的 OnError 调用,但在我的情况下这是不可能的。
我尝试了多种方案,方法有 Retry、Concat、Catch 等...但没有任何效果如我所愿。
Subject<int> sub = new Subject<int>();
var seq = sub.Select(x =>
{
//time dependent operation
Console.WriteLine(x);
return x;
}).
Catch((SeqException<int> ex) =>
{
return Observable.Empty(0); // what sequece to return to achieve the desired behaviour
});
seq.Subscribe();
sub.OnNext(1);
sub.OnNext(2);
sub.OnNext(3);
sub.OnError(new SeqException<int>{ Value = 4});
sub.OnNext(5);
sub.OnNext(6);
seq.Wait();
提前致谢。
您始终可以使用重试块包装任何内容。只是一个带有一个参数的动作的例子。
Action<T> RetryAction<T>( Action<T> action, int retries )
{
return arg => {
int count = 0;
while ( true )
{
try
{
action( arg );
return;
}
catch
{
if ( count == retries )
throw;
count++;
}
}
}
}
因此您不必更改序列处理的逻辑。
情况1:如果Select
里面的时间依赖操作是你需要重试的。
使用 Observable.Start
将 thunk 转换为 observable。然后你可以使用所有恢复操作符来声明你的行为。
Subject<int> sub = new Subject<int>();
var seq = sub.SelectMany(x =>
Observable.Start(() =>
{
//time dependent failure
if (DateTime.Now.Second % 2 == 0)
throw new Exception();
Console.WriteLine(x);
return x;
})
.Retry(5)
.Catch(Observable.Return(x))
);
//for testing
Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => (int)x).Subscribe(sub);
seq.Wait();
案例 2: 您实际上是通过 Subject
收到信号的。
Rx 协定要求在 OnComplete|OnError 之后不再发生通知。在收到该错误后,主题将处理其所有订阅并拆除管道。您需要将其带到案例 1 中才能使其正常工作。
我有以下问题:我有一个主题使用方法 OnNext 来通知新事件。但是,可能会发生异常,在下面的代码中使用方法调用 OnError 进行模拟。错误的原因取决于时间,所以如果我尝试再次重复相同的操作几次,它就会成功。所以我想使用 catch 方法,它接受一个生成新序列的函数来重复这个动作,比方说 5 次。如果所有重复尝试都失败,那么它应该抛出一个最终异常,如果至少有一个成功,它应该继续 OnNext(5) 和 OnNext(6)。
理想情况下,应该阻止异常和随后的 OnError 调用,但在我的情况下这是不可能的。
我尝试了多种方案,方法有 Retry、Concat、Catch 等...但没有任何效果如我所愿。
Subject<int> sub = new Subject<int>();
var seq = sub.Select(x =>
{
//time dependent operation
Console.WriteLine(x);
return x;
}).
Catch((SeqException<int> ex) =>
{
return Observable.Empty(0); // what sequece to return to achieve the desired behaviour
});
seq.Subscribe();
sub.OnNext(1);
sub.OnNext(2);
sub.OnNext(3);
sub.OnError(new SeqException<int>{ Value = 4});
sub.OnNext(5);
sub.OnNext(6);
seq.Wait();
提前致谢。
您始终可以使用重试块包装任何内容。只是一个带有一个参数的动作的例子。
Action<T> RetryAction<T>( Action<T> action, int retries )
{
return arg => {
int count = 0;
while ( true )
{
try
{
action( arg );
return;
}
catch
{
if ( count == retries )
throw;
count++;
}
}
}
}
因此您不必更改序列处理的逻辑。
情况1:如果Select
里面的时间依赖操作是你需要重试的。
使用 Observable.Start
将 thunk 转换为 observable。然后你可以使用所有恢复操作符来声明你的行为。
Subject<int> sub = new Subject<int>();
var seq = sub.SelectMany(x =>
Observable.Start(() =>
{
//time dependent failure
if (DateTime.Now.Second % 2 == 0)
throw new Exception();
Console.WriteLine(x);
return x;
})
.Retry(5)
.Catch(Observable.Return(x))
);
//for testing
Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => (int)x).Subscribe(sub);
seq.Wait();
案例 2: 您实际上是通过 Subject
收到信号的。
Rx 协定要求在 OnComplete|OnError 之后不再发生通知。在收到该错误后,主题将处理其所有订阅并拆除管道。您需要将其带到案例 1 中才能使其正常工作。