如何处理异常并用 Rx 重复

How to handle exception and repeat with Rx

我正在使用 Observable.FromAsync 执行 IO 操作。我想重复这个 forever.What 我不明白如何处理异常,对它们做一些处理然后返回在我的循环中:

我尝试过的:

IObservable<string> ioObs=Observable.FromAsync<string>([something]);  //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
  onNext:x=> Console.Writeline($"Message:{x}"),
  onCompleted: Console.Writeline("Completed"),
  onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);

现在我不明白为什么我的 observable 在第一个 exception.Am 之后结束,我没有告诉它继续。

我想做的事情:

如果我的 observable 是可枚举的,我会想要这种行为:

public IAsyncEnumerable<string> EnumerableBehaviour()
{
   while(true)
   {
      try
      {
        string data=await ReadAsync();  //the `FromAsync` delegate
        yield return data;
      }catch(Exception ex)
          yield return "Error";
      {
   }
}

即使触发了 OnError,我如何继续执行 Repeat

Observable.CatchObservable.Throw应该如何与Observable.Repeat结合?

一旦抛出异常,observable 就死掉了。这与正常的方法执行相同。当抛出异常时,它会在 try-catch 块所在的位置进行处理。无法“返回”最初生成异常的位置并从那里继续。与 observables 相同。

Observable.Catch() 扩展方法可用于捕获此类异常(来自现在已死的可观察对象)并继续使用新的可观察对象。您正在使用的重载 Observable.Catch(IObservable<TSource>) 使用 params 参数供 IObservable 对象使用。当第一个失败并出现异常时,跳转到第二个。如果那个失败,跳到第三个,依此类推。但是,您只提供了一个带有 Observable.Catch(ioObs) 的可观察对象,因此当第一个可观察对象失败时,没有什么可跳转到的。

当您使用需要使用可枚举订阅的 other overload 时,这可能是可行的。当一个失败时,将使用下一个。请参阅以下示例:

public static int requestCounter = 0;
public static Task<string> SomeSourceMethod() {
    Random r = new Random();
    Thread.Sleep(50);
    if (r.NextDouble() < 0.5) {
        Console.Write("E ");
        throw new Exception();
    }
    return Task.FromResult($"test {requestCounter++}");
}
    
public static IEnumerable<IObservable<string>> ObservableEnumerable() {
    for (;;) {
        IObservable<string> foo = Observable.FromAsync<string>(SomeSourceMethod);
        IObservable<string> fooRepeated = foo.Repeat();
        yield return fooRepeated;
    }
}

static void Main(string[] args)
{
    IObservable<string> endless = ObservableEnumerable().Catch();
    IDisposable subscription = endless
        .SubscribeOn(NewThreadScheduler.Default)
        .Subscribe(it => Console.WriteLine(it));

    Thread.Sleep(1000);
    Console.WriteLine("Terminating the subscription");
    subscription.Dispose();
}

这可能会生成如下输出:

E test 1
E E E E test 2
test 3
test 4
test 5
test 6
E E test 7
test 8
E test 9
Terminating the subscription
E E E E E E E E E E E E E E E E E E E 

但是它不能正常工作,因为我必须努力终止进程才能停止。它有点(?)以某种方式起作用,但我不确定这样做是否是正确的方法(可能不是)。

当你有一个 IObservable<string> ioObs = Observable.FromAsync<string>(Something); 时,你有一个可以 return 一个值然后完成的可观察对象 ({OnNext}{OnCompleted}) 或者你有一个会抛出异常 ({ OnError}).

允许源代码在 returning 值或错误后重复执行非常简单。

IObservable<string> query = ioObs.Retry().Repeat();

.Retry() 表示“如果出现错误,请重试”。 .Repeat() 表示“如果 observable 完成,请再次订阅”。

现在这有点危险,因为您已经生成了一个将连续执行的可观察对象。你需要找到一些方法来阻止它。

您的选择是:

  • 销毁订阅
  • 取一定数量的值(即.Take(n)
  • 设置一个.Timeout.
  • 或使用TakeUntil

当您的原始 ioObs return 完成时为 null 或空字符串时,最后一个很好。

你可以这样做:

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x == null);
        

这里有一段代码供您试用:

private int __counter = 0;
Task<string> Something()
{
    return Task.Run(() =>
    {
        if (Interlocked.Increment(ref __counter) % 7 == 0)
        {
            throw new Exception("Blam!");
        }
        return $"Hello World {__counter}";
    });
}

然后这样做:

IObservable<string> ioObs = Observable.FromAsync<string>(Something);

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x.EndsWith("19"));
        

当我订阅时,我得到:

Hello World 1 
Hello World 2 
Hello World 3 
Hello World 4 
Hello World 5 
Hello World 6 
Hello World 8 
Hello World 9 
Hello World 10 
Hello World 11 
Hello World 12 
Hello World 13 
Hello World 15 
Hello World 16 
Hello World 17 
Hello World 18 
Hello World 19 

请注意缺少 714,因为那是抛出异常的时间。