C# Rx - 忽略错误
C# Rx - ignoring error
我有独立的事件流,这些事件是用 Reactive 扩展异步处理的。处理程序可能因任何原因失败,但流继续。
然而,在Rx中,错误发生后,它会自动退订。这是可配置的吗?
示例:
async Task<Unit> ActionAsync(int i)
{
if (i > 1)
throw new Exception();
i.Dump();
return Unit.Default;
}
void Main()
{
var sb = new Subject<int>();
sb.SelectMany(ActionAsync).Subscribe(
_ => { },
ex =>
{
ex.Dump();
}
);
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
我想要以下输出:
- 1
- 异常
- 3
如果 ActionAsync
中没有 try/catching,我可以实现吗?
在 Rx 中有一个行为契约,其中一个流只能是 OnNext*(OnError|OnCompleted)
。换句话说,零个或多个 OnNext
,最后只有 OnError
或 OnCompleted
之一。
所以,不,你不能配置 Rx。如果你这样做,它将不再是 Rx。
但是,您可以编写一个可以重试源的查询。
如果你这样写代码:
async Task<int> ActionAsync(int i)
{
if (i == 2)
throw new Exception();
return i;
}
void Main()
{
var sb = new Subject<int>();
sb
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry()
.Subscribe(_ => _.Dump());
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
然后你会得到:
1
Exception of type 'System.Exception' was thrown.
3
根据您询问性能问题的评论,使用 .Retry()
没有任何性能问题,但存在行为问题。
如果源是冷的 - 如 var sb = new [] { 1, 2, 3 }.ToObservable();
- 那么 .Retry()
将再次从整个可观察序列开始并导致无限序列:
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
...
在您的代码中,可观察对象是热可观察对象,因此不会发生这种情况。
如果你想在冷的 Observable 上执行此操作,你需要通过 .Publish(...)
使其变热。像这样:
var sb = new[] { 1, 2, 3 }.ToObservable();
sb
.Publish(sbp =>
sbp
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry())
.Subscribe(_ => _.Dump());
然后是预期的行为 returns。
async Task<Unit> ActionAsync(int i)
{
if (i > 1)
throw new Exception();
i.Dump();
return Unit.Default;
}
void Main()
{
var sb = new Subject<int>();
sb.SelectMany(i => Observable.FromAsync(() => ActionAsync(i)).Materialize())
.Subscribe(item =>
{
if (item.Kind == NotificationKind.OnError)
{
item.Exception.Dump();
}
//else if (item.Kind == NotificationKind.OnNext)
//{
// var value = item.Value;
//}
//else if (item.Kind == NotificationKind.OnCompleted)
//{
//}
}
);
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
请注意,如果核心逻辑保持不变,您将永远不会得到输出“3”,因为 (i > 1) 有例外,其中包括 3。您可能想将 (i > 1) 更改为 (i == 2) 得到题中指定的输出。
我有独立的事件流,这些事件是用 Reactive 扩展异步处理的。处理程序可能因任何原因失败,但流继续。
然而,在Rx中,错误发生后,它会自动退订。这是可配置的吗?
示例:
async Task<Unit> ActionAsync(int i)
{
if (i > 1)
throw new Exception();
i.Dump();
return Unit.Default;
}
void Main()
{
var sb = new Subject<int>();
sb.SelectMany(ActionAsync).Subscribe(
_ => { },
ex =>
{
ex.Dump();
}
);
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
我想要以下输出:
- 1
- 异常
- 3
如果 ActionAsync
中没有 try/catching,我可以实现吗?
在 Rx 中有一个行为契约,其中一个流只能是 OnNext*(OnError|OnCompleted)
。换句话说,零个或多个 OnNext
,最后只有 OnError
或 OnCompleted
之一。
所以,不,你不能配置 Rx。如果你这样做,它将不再是 Rx。
但是,您可以编写一个可以重试源的查询。
如果你这样写代码:
async Task<int> ActionAsync(int i)
{
if (i == 2)
throw new Exception();
return i;
}
void Main()
{
var sb = new Subject<int>();
sb
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry()
.Subscribe(_ => _.Dump());
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
然后你会得到:
1 Exception of type 'System.Exception' was thrown. 3
根据您询问性能问题的评论,使用 .Retry()
没有任何性能问题,但存在行为问题。
如果源是冷的 - 如 var sb = new [] { 1, 2, 3 }.ToObservable();
- 那么 .Retry()
将再次从整个可观察序列开始并导致无限序列:
1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. ...
在您的代码中,可观察对象是热可观察对象,因此不会发生这种情况。
如果你想在冷的 Observable 上执行此操作,你需要通过 .Publish(...)
使其变热。像这样:
var sb = new[] { 1, 2, 3 }.ToObservable();
sb
.Publish(sbp =>
sbp
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry())
.Subscribe(_ => _.Dump());
然后是预期的行为 returns。
async Task<Unit> ActionAsync(int i)
{
if (i > 1)
throw new Exception();
i.Dump();
return Unit.Default;
}
void Main()
{
var sb = new Subject<int>();
sb.SelectMany(i => Observable.FromAsync(() => ActionAsync(i)).Materialize())
.Subscribe(item =>
{
if (item.Kind == NotificationKind.OnError)
{
item.Exception.Dump();
}
//else if (item.Kind == NotificationKind.OnNext)
//{
// var value = item.Value;
//}
//else if (item.Kind == NotificationKind.OnCompleted)
//{
//}
}
);
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
请注意,如果核心逻辑保持不变,您将永远不会得到输出“3”,因为 (i > 1) 有例外,其中包括 3。您可能想将 (i > 1) 更改为 (i == 2) 得到题中指定的输出。