C# Rx - 忽略错误

C# Rx - ignoring error

我有独立的事件流,这些事件是用 Reactive 扩展异步处理的。处理程序可能因任何原因失败,但流继续。

然而,在Rx中,错误发生后,它会自动退订。这是可配置的吗?

示例:

async Task<Unit> ActionAsync(int i)
{
    if (i > 1)
        throw new Exception();

    i.Dump();       
    return Unit.Default;
}

void Main()
{
    var sb = new Subject<int>();

    sb.SelectMany(ActionAsync).Subscribe(
        _ => { },
        ex =>
        {
            ex.Dump();
        }
    );


    sb.OnNext(1);
    sb.OnNext(2);
    sb.OnNext(3);
}

我想要以下输出:

如果 ActionAsync 中没有 try/catching,我可以实现吗?

在 Rx 中有一个行为契约,其中一个流只能是 OnNext*(OnError|OnCompleted)。换句话说,零个或多个 OnNext,最后只有 OnErrorOnCompleted 之一。

所以,不,你不能配置 Rx。如果你这样做,它将不再是 Rx。

但是,您可以编写一个可以重试源的查询。

如果你这样写代码:

async Task<int> ActionAsync(int i)
{
    if (i == 2)
        throw new Exception();

    return i;
}

void Main()
{
    var sb = new Subject<int>();

    sb
        .SelectMany(ActionAsync)
        .Do(_ => { }, ex => ex.Dump())
        .Retry()
        .Subscribe(_ => _.Dump());

    sb.OnNext(1);
    sb.OnNext(2);
    sb.OnNext(3);
}

然后你会得到:

1
Exception of type 'System.Exception' was thrown. 
3

根据您询问性能问题的评论,使用 .Retry() 没有任何性能问题,但存在行为问题。

如果源是冷的 - 如 var sb = new [] { 1, 2, 3 }.ToObservable(); - 那么 .Retry() 将再次从整个可观察序列开始并导致无限序列:

1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
...

在您的代码中,可观察对象是热可观察对象,因此不会发生这种情况。

如果你想在冷的 Observable 上执行此操作,你需要通过 .Publish(...) 使其变热。像这样:

var sb = new[] { 1, 2, 3 }.ToObservable();

sb
    .Publish(sbp =>
        sbp
            .SelectMany(ActionAsync)
            .Do(_ => { }, ex => ex.Dump())
            .Retry())
    .Subscribe(_ => _.Dump());

然后是预期的行为 returns。

使用Materialize

    async Task<Unit> ActionAsync(int i)
    {
        if (i > 1)
            throw new Exception();

        i.Dump();
        return Unit.Default;
    }

    void Main()
    {
        var sb = new Subject<int>();
        sb.SelectMany(i => Observable.FromAsync(() => ActionAsync(i)).Materialize())
            .Subscribe(item =>
            {
                if (item.Kind == NotificationKind.OnError)
                {
                    item.Exception.Dump();
                }
                //else if (item.Kind == NotificationKind.OnNext)
                //{
                //    var value = item.Value;
                //}
                //else if (item.Kind == NotificationKind.OnCompleted)
                //{
                //}
            }
        );
        sb.OnNext(1);
        sb.OnNext(2);
        sb.OnNext(3);
    }

请注意,如果核心逻辑保持不变,您将永远不会得到输出“3”,因为 (i > 1) 有例外,其中包括 3。您可能想将 (i > 1) 更改为 (i == 2) 得到题中指定的输出。