如何处理异常并用 Rx 重复
How to handle exception and repeat with Rx
我正在使用 Observable.FromAsync
执行 IO
操作。我想重复这个 forever.What 我不明白如何处理异常,对它们做一些处理然后返回在我的循环中:
我尝试过的:
IObservable<string> ioObs=Observable.FromAsync<string>([something]); //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
onNext:x=> Console.Writeline($"Message:{x}"),
onCompleted: Console.Writeline("Completed"),
onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);
现在我不明白为什么我的 observable 在第一个 exception.Am 之后结束,我没有告诉它继续。
我想做的事情:
- 执行 IO 操作
- 如果它抛出 return 一些自定义值
- 重复循环
如果我的 observable 是可枚举的,我会想要这种行为:
public IAsyncEnumerable<string> EnumerableBehaviour()
{
while(true)
{
try
{
string data=await ReadAsync(); //the `FromAsync` delegate
yield return data;
}catch(Exception ex)
yield return "Error";
{
}
}
即使触发了 OnError
,我如何继续执行 Repeat
?
Observable.Catch
和Observable.Throw
应该如何与Observable.Repeat
结合?
一旦抛出异常,observable 就死掉了。这与正常的方法执行相同。当抛出异常时,它会在 try
-catch
块所在的位置进行处理。无法“返回”最初生成异常的位置并从那里继续。与 observables 相同。
Observable.Catch()
扩展方法可用于捕获此类异常(来自现在已死的可观察对象)并继续使用新的可观察对象。您正在使用的重载 Observable.Catch(IObservable<TSource>)
使用 params
参数供 IObservable
对象使用。当第一个失败并出现异常时,跳转到第二个。如果那个失败,跳到第三个,依此类推。但是,您只提供了一个带有 Observable.Catch(ioObs)
的可观察对象,因此当第一个可观察对象失败时,没有什么可跳转到的。
当您使用需要使用可枚举订阅的 other overload 时,这可能是可行的。当一个失败时,将使用下一个。请参阅以下示例:
public static int requestCounter = 0;
public static Task<string> SomeSourceMethod() {
Random r = new Random();
Thread.Sleep(50);
if (r.NextDouble() < 0.5) {
Console.Write("E ");
throw new Exception();
}
return Task.FromResult($"test {requestCounter++}");
}
public static IEnumerable<IObservable<string>> ObservableEnumerable() {
for (;;) {
IObservable<string> foo = Observable.FromAsync<string>(SomeSourceMethod);
IObservable<string> fooRepeated = foo.Repeat();
yield return fooRepeated;
}
}
static void Main(string[] args)
{
IObservable<string> endless = ObservableEnumerable().Catch();
IDisposable subscription = endless
.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(it => Console.WriteLine(it));
Thread.Sleep(1000);
Console.WriteLine("Terminating the subscription");
subscription.Dispose();
}
这可能会生成如下输出:
E test 1
E E E E test 2
test 3
test 4
test 5
test 6
E E test 7
test 8
E test 9
Terminating the subscription
E E E E E E E E E E E E E E E E E E E
但是它不能正常工作,因为我必须努力终止进程才能停止。它有点(?)以某种方式起作用,但我不确定这样做是否是正确的方法(可能不是)。
当你有一个 IObservable<string> ioObs = Observable.FromAsync<string>(Something);
时,你有一个可以 return 一个值然后完成的可观察对象 ({OnNext}{OnCompleted}) 或者你有一个会抛出异常 ({ OnError}).
允许源代码在 returning 值或错误后重复执行非常简单。
IObservable<string> query = ioObs.Retry().Repeat();
.Retry()
表示“如果出现错误,请重试”。 .Repeat()
表示“如果 observable 完成,请再次订阅”。
现在这有点危险,因为您已经生成了一个将连续执行的可观察对象。你需要找到一些方法来阻止它。
您的选择是:
- 销毁订阅
- 取一定数量的值(即
.Take(n)
)
- 设置一个
.Timeout
.
- 或使用
TakeUntil
当您的原始 ioObs
return 完成时为 null 或空字符串时,最后一个很好。
你可以这样做:
IObservable<string> query = ioObs.Retry().Repeat()
.TakeUntil(x => x == null);
这里有一段代码供您试用:
private int __counter = 0;
Task<string> Something()
{
return Task.Run(() =>
{
if (Interlocked.Increment(ref __counter) % 7 == 0)
{
throw new Exception("Blam!");
}
return $"Hello World {__counter}";
});
}
然后这样做:
IObservable<string> ioObs = Observable.FromAsync<string>(Something);
IObservable<string> query = ioObs.Retry().Repeat()
.TakeUntil(x => x.EndsWith("19"));
当我订阅时,我得到:
Hello World 1
Hello World 2
Hello World 3
Hello World 4
Hello World 5
Hello World 6
Hello World 8
Hello World 9
Hello World 10
Hello World 11
Hello World 12
Hello World 13
Hello World 15
Hello World 16
Hello World 17
Hello World 18
Hello World 19
请注意缺少 7
和 14
,因为那是抛出异常的时间。
我正在使用 Observable.FromAsync
执行 IO
操作。我想重复这个 forever.What 我不明白如何处理异常,对它们做一些处理然后返回在我的循环中:
我尝试过的:
IObservable<string> ioObs=Observable.FromAsync<string>([something]); //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
onNext:x=> Console.Writeline($"Message:{x}"),
onCompleted: Console.Writeline("Completed"),
onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);
现在我不明白为什么我的 observable 在第一个 exception.Am 之后结束,我没有告诉它继续。
我想做的事情:
- 执行 IO 操作
- 如果它抛出 return 一些自定义值
- 重复循环
如果我的 observable 是可枚举的,我会想要这种行为:
public IAsyncEnumerable<string> EnumerableBehaviour()
{
while(true)
{
try
{
string data=await ReadAsync(); //the `FromAsync` delegate
yield return data;
}catch(Exception ex)
yield return "Error";
{
}
}
即使触发了 OnError
,我如何继续执行 Repeat
?
Observable.Catch
和Observable.Throw
应该如何与Observable.Repeat
结合?
一旦抛出异常,observable 就死掉了。这与正常的方法执行相同。当抛出异常时,它会在 try
-catch
块所在的位置进行处理。无法“返回”最初生成异常的位置并从那里继续。与 observables 相同。
Observable.Catch()
扩展方法可用于捕获此类异常(来自现在已死的可观察对象)并继续使用新的可观察对象。您正在使用的重载 Observable.Catch(IObservable<TSource>)
使用 params
参数供 IObservable
对象使用。当第一个失败并出现异常时,跳转到第二个。如果那个失败,跳到第三个,依此类推。但是,您只提供了一个带有 Observable.Catch(ioObs)
的可观察对象,因此当第一个可观察对象失败时,没有什么可跳转到的。
当您使用需要使用可枚举订阅的 other overload 时,这可能是可行的。当一个失败时,将使用下一个。请参阅以下示例:
public static int requestCounter = 0;
public static Task<string> SomeSourceMethod() {
Random r = new Random();
Thread.Sleep(50);
if (r.NextDouble() < 0.5) {
Console.Write("E ");
throw new Exception();
}
return Task.FromResult($"test {requestCounter++}");
}
public static IEnumerable<IObservable<string>> ObservableEnumerable() {
for (;;) {
IObservable<string> foo = Observable.FromAsync<string>(SomeSourceMethod);
IObservable<string> fooRepeated = foo.Repeat();
yield return fooRepeated;
}
}
static void Main(string[] args)
{
IObservable<string> endless = ObservableEnumerable().Catch();
IDisposable subscription = endless
.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(it => Console.WriteLine(it));
Thread.Sleep(1000);
Console.WriteLine("Terminating the subscription");
subscription.Dispose();
}
这可能会生成如下输出:
E test 1
E E E E test 2
test 3
test 4
test 5
test 6
E E test 7
test 8
E test 9
Terminating the subscription
E E E E E E E E E E E E E E E E E E E
但是它不能正常工作,因为我必须努力终止进程才能停止。它有点(?)以某种方式起作用,但我不确定这样做是否是正确的方法(可能不是)。
当你有一个 IObservable<string> ioObs = Observable.FromAsync<string>(Something);
时,你有一个可以 return 一个值然后完成的可观察对象 ({OnNext}{OnCompleted}) 或者你有一个会抛出异常 ({ OnError}).
允许源代码在 returning 值或错误后重复执行非常简单。
IObservable<string> query = ioObs.Retry().Repeat();
.Retry()
表示“如果出现错误,请重试”。 .Repeat()
表示“如果 observable 完成,请再次订阅”。
现在这有点危险,因为您已经生成了一个将连续执行的可观察对象。你需要找到一些方法来阻止它。
您的选择是:
- 销毁订阅
- 取一定数量的值(即
.Take(n)
) - 设置一个
.Timeout
. - 或使用
TakeUntil
当您的原始 ioObs
return 完成时为 null 或空字符串时,最后一个很好。
你可以这样做:
IObservable<string> query = ioObs.Retry().Repeat()
.TakeUntil(x => x == null);
这里有一段代码供您试用:
private int __counter = 0;
Task<string> Something()
{
return Task.Run(() =>
{
if (Interlocked.Increment(ref __counter) % 7 == 0)
{
throw new Exception("Blam!");
}
return $"Hello World {__counter}";
});
}
然后这样做:
IObservable<string> ioObs = Observable.FromAsync<string>(Something);
IObservable<string> query = ioObs.Retry().Repeat()
.TakeUntil(x => x.EndsWith("19"));
当我订阅时,我得到:
Hello World 1 Hello World 2 Hello World 3 Hello World 4 Hello World 5 Hello World 6 Hello World 8 Hello World 9 Hello World 10 Hello World 11 Hello World 12 Hello World 13 Hello World 15 Hello World 16 Hello World 17 Hello World 18 Hello World 19
请注意缺少 7
和 14
,因为那是抛出异常的时间。