反应式扩展中的无限捕获

Infinite Catch in Reactive Extenions

我的问题与 Rx 和 Catch 运算符有关。假设我的可观察对象有超时,每次发生超时我都想重新创建底层可观察对象(Catch)并做同样的事情(添加超时和捕获)。

下面我粘贴了示例代码。出于此示例的目的,超时始终每 2 秒发生一次。根据我的观察,这段代码不能无限地工作,不知何故,在重新创建之后,某些东西引用了旧的可观察到的剩余物。当调用 Catch 时,那些剩菜会一直累积。

最可疑的是最后一行,存在某种自我引用。但我实际上无法想象自己为什么会出错?还有什么方法可以用类似的逻辑创建可永远工作的可观察对象?

    public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable)
    {
        return targetObservable
            .Timeout(TimeSpan.FromSeconds(2))
            .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable));
    }

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable)
    {
        GC.Collect(); // For debug - make sure all unreferenced object are removed

        return recreateObservable()
            .Timeout(TimeSpan.FromSeconds(2))
            .Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable));
    }

你可以这样做。

//done in Linqpad, where async Main is allowed.
async void Main() 
{
    var source = new Subject<string>();
    var backup = new Subject<string>();
    var reliableStream = source.CreateReliableStream(() => backup);
    reliableStream.Subscribe(s => Console.WriteLine($"Next: {s}"), e => Console.WriteLine($"Error: {e.Message}"), () => Console.WriteLine("Completed."));

    source.OnNext("sourceAbc");
    backup.OnNext("backupAbc");
    await Task.Delay(TimeSpan.FromSeconds(2.5));

    source.OnNext("sourceDef");
    backup.OnNext("backupDef");
    await Task.Delay(TimeSpan.FromSeconds(2.5));

    //Doesn't yield "Completed" because it's re-subscribing.
    source.OnCompleted();
    backup.OnCompleted();

}

public static class Ex
{
    public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable)
    {
        return targetObservable
            .Timeout(TimeSpan.FromSeconds(2))
            .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable));
    }

    public static IEnumerable<IObservable<T>> InfiniteObservables<T>(Func<IObservable<T>> f)
    {
        while(true)
            yield return f();
    }

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable)
    {
        GC.Collect(); // For debug - make sure all unreferenced object are removed

        return InfiniteObservables(recreateObservable)
            .Select(o => o.Timeout(TimeSpan.FromSeconds(2)))
            .OnErrorResumeNext();
    }
}

产生以下输出:

Next: sourceAbc
Next: sourceDef
Next: backupHij
Next: backupLmn

虽然我不喜欢这种方法。 Rx 将错误视为流终止符,而您正试图将它们视为备用消息。你最终会像这样逆流而上。

我认为您只想使用 Retry() 运算符。

我假设您的初始序列与您的后续序列相同。

例如

Observable.Return(1).Concat(Observable.Throw<int>(new Exception()))
    .Retry()

这将 运行 进入一个紧密的无限循环。

您的代码最终可能看起来像

createObservable()
    .Timeout(TimeSpan.FromSeconds(2))
    .Retry()