反应式扩展中的无限捕获
Infinite Catch in Reactive Extenions
我的问题与 Rx 和 Catch 运算符有关。假设我的可观察对象有超时,每次发生超时我都想重新创建底层可观察对象(Catch)并做同样的事情(添加超时和捕获)。
下面我粘贴了示例代码。出于此示例的目的,超时始终每 2 秒发生一次。根据我的观察,这段代码不能无限地工作,不知何故,在重新创建之后,某些东西引用了旧的可观察到的剩余物。当调用 Catch 时,那些剩菜会一直累积。
最可疑的是最后一行,存在某种自我引用。但我实际上无法想象自己为什么会出错?还有什么方法可以用类似的逻辑创建可永远工作的可观察对象?
public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable)
{
return targetObservable
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable));
}
private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable)
{
GC.Collect(); // For debug - make sure all unreferenced object are removed
return recreateObservable()
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable));
}
你可以这样做。
//done in Linqpad, where async Main is allowed.
async void Main()
{
var source = new Subject<string>();
var backup = new Subject<string>();
var reliableStream = source.CreateReliableStream(() => backup);
reliableStream.Subscribe(s => Console.WriteLine($"Next: {s}"), e => Console.WriteLine($"Error: {e.Message}"), () => Console.WriteLine("Completed."));
source.OnNext("sourceAbc");
backup.OnNext("backupAbc");
await Task.Delay(TimeSpan.FromSeconds(2.5));
source.OnNext("sourceDef");
backup.OnNext("backupDef");
await Task.Delay(TimeSpan.FromSeconds(2.5));
//Doesn't yield "Completed" because it's re-subscribing.
source.OnCompleted();
backup.OnCompleted();
}
public static class Ex
{
public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable)
{
return targetObservable
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable));
}
public static IEnumerable<IObservable<T>> InfiniteObservables<T>(Func<IObservable<T>> f)
{
while(true)
yield return f();
}
private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable)
{
GC.Collect(); // For debug - make sure all unreferenced object are removed
return InfiniteObservables(recreateObservable)
.Select(o => o.Timeout(TimeSpan.FromSeconds(2)))
.OnErrorResumeNext();
}
}
产生以下输出:
Next: sourceAbc
Next: sourceDef
Next: backupHij
Next: backupLmn
虽然我不喜欢这种方法。 Rx 将错误视为流终止符,而您正试图将它们视为备用消息。你最终会像这样逆流而上。
我认为您只想使用 Retry()
运算符。
我假设您的初始序列与您的后续序列相同。
例如
Observable.Return(1).Concat(Observable.Throw<int>(new Exception()))
.Retry()
这将 运行 进入一个紧密的无限循环。
您的代码最终可能看起来像
createObservable()
.Timeout(TimeSpan.FromSeconds(2))
.Retry()
我的问题与 Rx 和 Catch 运算符有关。假设我的可观察对象有超时,每次发生超时我都想重新创建底层可观察对象(Catch)并做同样的事情(添加超时和捕获)。
下面我粘贴了示例代码。出于此示例的目的,超时始终每 2 秒发生一次。根据我的观察,这段代码不能无限地工作,不知何故,在重新创建之后,某些东西引用了旧的可观察到的剩余物。当调用 Catch 时,那些剩菜会一直累积。
最可疑的是最后一行,存在某种自我引用。但我实际上无法想象自己为什么会出错?还有什么方法可以用类似的逻辑创建可永远工作的可观察对象?
public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable)
{
return targetObservable
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable));
}
private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable)
{
GC.Collect(); // For debug - make sure all unreferenced object are removed
return recreateObservable()
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable));
}
你可以这样做。
//done in Linqpad, where async Main is allowed.
async void Main()
{
var source = new Subject<string>();
var backup = new Subject<string>();
var reliableStream = source.CreateReliableStream(() => backup);
reliableStream.Subscribe(s => Console.WriteLine($"Next: {s}"), e => Console.WriteLine($"Error: {e.Message}"), () => Console.WriteLine("Completed."));
source.OnNext("sourceAbc");
backup.OnNext("backupAbc");
await Task.Delay(TimeSpan.FromSeconds(2.5));
source.OnNext("sourceDef");
backup.OnNext("backupDef");
await Task.Delay(TimeSpan.FromSeconds(2.5));
//Doesn't yield "Completed" because it's re-subscribing.
source.OnCompleted();
backup.OnCompleted();
}
public static class Ex
{
public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable)
{
return targetObservable
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable));
}
public static IEnumerable<IObservable<T>> InfiniteObservables<T>(Func<IObservable<T>> f)
{
while(true)
yield return f();
}
private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable)
{
GC.Collect(); // For debug - make sure all unreferenced object are removed
return InfiniteObservables(recreateObservable)
.Select(o => o.Timeout(TimeSpan.FromSeconds(2)))
.OnErrorResumeNext();
}
}
产生以下输出:
Next: sourceAbc
Next: sourceDef
Next: backupHij
Next: backupLmn
虽然我不喜欢这种方法。 Rx 将错误视为流终止符,而您正试图将它们视为备用消息。你最终会像这样逆流而上。
我认为您只想使用 Retry()
运算符。
我假设您的初始序列与您的后续序列相同。
例如
Observable.Return(1).Concat(Observable.Throw<int>(new Exception()))
.Retry()
这将 运行 进入一个紧密的无限循环。
您的代码最终可能看起来像
createObservable()
.Timeout(TimeSpan.FromSeconds(2))
.Retry()