抛出可观察的 LINQ 不一致异常

Observable LINQ inconsistent exceptions thrown

在我编写股票市场交易员的过程中 IObserver 我遇到了三个错误,这些错误大多来自 Reactive Extensions 库。

我有以下 CompanyInfo class:

public class CompanyInfo
{
    public string Name { get; set; }

    public double Value { get; set; }
}

还有一个 IObservable<CompanyInfo> 叫做 StockMarket:

public class StockMarket : IObservable<CompanyInfo>

我的 Observer 如下所示:

public class StockTrader : IObserver<CompanyInfo>
{
    public void OnCompleted()
    {
        Console.WriteLine("Market Closed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine(error);
    }

    public void OnNext(CompanyInfo value)
    {
        WriteStock(value);
    }

    private void WriteStock(CompanyInfo value) { ... }
}

我运行以下代码:

StockMarket market = GetStockMarket();
StockTrader trader = new StockTrader();

IObservable<CompanyInfo> differential = market  //[F, 1], [S, 5], [S, 4], [F, 2]
    .GroupBy(x => x.Name)                       //[F, 1], [F, 2]; [S, 5], [S, 4]
    .SelectMany(x => x                  //4, 8, 2, 3
        .Buffer(2, 1)                   //(4, 8), (8, 2), (2, 3), (3)
        .SkipLast(1)                    //(4, 8), (8, 2), (2, 3)
        .Select(y => new CompanyInfo    //(+100%), (-75%), (+50%)
        {
            Name = x.Key,
            Value = (y[1].Value - y[0].Value) / y[0].Value
        })                                      //[F, +100%]; [S, -20%]
    );

using (IDisposable subscription = differential.Subscribe(trader))
{
    Observable.Wait(market);
}

出现三个错误之一:

是什么导致了这些奇怪的症状?

关于 Reactive Extensions 概念的最伟大的事情之一是能够订阅发生的 'occurrence' (IObservable) 'somewhere' 并在此 'occurrence' 上应用面向对象的概念 - 这无需知道 在哪里'somewhere'是.

这样 Reactive Extensions 简化了面向 event 的编程和 producer-consumer problems 很多

在不知道观察到的数据来源的情况下订阅IObservable的能力迫使订阅者假设通知是不可预测的.换句话说,在观察 IObservable 时,您应该假设通知可以同时传送 .

由于 Reactive Externsions 的行为契约,IObservables 应该一次生产一件商品。通常情况就是这样,但有时外部实施不遵循该合同。

让我们分别看一下这三个问题:

GroupBy 不是线程安全的


GroupBy 通过返回一个 IObservable<IGroupedObservable<T>> 来工作,它的 OnNext 方法使用 IGroupedObservable<T> 调用外部 IObservableOnNext匹配当前通知。它通过为 Dictionary 中的每个键保留一个 IGroupedObservable<T> (更准确地说是一个 Subject<T> 来实现 - 这并不奇怪 - 不是 ConcurrentDictionary。这意味着 两个接近的通知可能导致双重插入.

Select不孤单


Select 的线程安全由其提供的委托决定。在上面的例子中,提供给 Select 方法的委托依赖于 Buffer(2, 1) 将提供大小为 2 的列表这一事实。 Buffer 包含一个 Queue,它不是并发的 ,因此 当从多个线程迭代时 - BufferQueue可以给我们一些意想不到的结果.

另一个可能因相同原因抛出的 ExceptionNullReferenceException if y would be provided null, or an InvalidOperationException 因为 Queue 可以在迭代时进行修改。

连基本观察都不安全


最后但同样重要的是,即使您只进行基本观察,StockTraderOnNext 方法也会以非 atomic operation 方式修改控制台,这会导致奇怪文本布局。

那你能做什么呢?


Synchronize method exists to make you able to validate that you are subscribing to a linear IObservable<T>这意味着不能同时调用OnNext方法

因为即使 GroupBy 扩展方法也不是线程安全的,所以需要在链的开头调用 Synchronize 方法:

IObservable<CompanyInfo> differential = market  //[F, 1], [S, 5], [S, 4], [F, 2]
    .Synchronize()
    .GroupBy(x => x.Name)                       //[F, 1], [F, 2]; [S, 5], [S, 4]
    .SelectMany(x => x                  //4, 8, 2, 3
        .Buffer(2, 1)                   //(4, 8), (8, 2), (2, 3), (3)
        .SkipLast(1)                    //(4, 8), (8, 2), (2, 3)
        .Select(y => new CompanyInfo    //(+100%), (-75%), (+50%)
        {
            Name = x.Key,
            Value = (y[1].Value - y[0].Value) / y[0].Value
        })                                      
    );                                          //[F, +100%]; [S, -20%]

请注意 Synchronize 向您的查询添加了另一个代理 Observable,因此它会使查询变慢一点,因此 您应该避免使用不需要的时候.

您的代码的问题不在于查询,也不在于 Rx 本身。问题可能来自您实际的 StockMarketStockTrader 实现。

现在,问题很可能是因为您正在为您的 market observable 创建两个订阅。

当你这样写的时候:

using (IDisposable subscription = differential.Subscribe(trader))
{
    Observable.Wait(market);
}

...您将收到两个 market 的订阅。一个在 differential.Subscribe(trader) 中,另一个是因为 Observable.Wait(market);.

我怀疑是两个并发订阅导致了您的问题,但是在没有看到 StockMarket 的实现的情况下,我们无法判断它为什么会抛出错误。

这是实现您自己的可观察和观察者实现的危险。你应该避免这样做。使用标准 Rx 运算符构建的 属性 IObservable<CompanyInfo> CompanyValues { get; } 挂在 CompanyInfo 上会更好。

并且您应该始终避免像 .Wait(...).

这样的阻塞操作

作为快速测试,我会将您当前的 Observable.Wait(market); 替换为具有足够长睡眠时间的 Thread.Sleep(?),以查看您的代码是否正常运行。当然,您需要确保在后台调度程序上生成值(如 Scheduler.Default)。

我运行这个代码来测试你的查询:

public class CompanyInfo
{
    public string Name { get; set; }

    public double Value { get; set; }
}

public class StockTrader : IObserver<CompanyInfo>
{
    public void OnCompleted()
    {
        Console.WriteLine("Market Closed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine(error);
    }

    public void OnNext(CompanyInfo value)
    {
        WriteStock(value);
    }

    private void WriteStock(CompanyInfo value) { Console.WriteLine($"{value.Name} = {value.Value}"); }
}

public class StockMarket : IObservable<CompanyInfo>
{
    private CompanyInfo[] _values = new CompanyInfo[]
    {
        new CompanyInfo() { Name = "F", Value = 1 },
        new CompanyInfo() { Name = "S", Value = 5 },
        new CompanyInfo() { Name = "S", Value = 4 },
        new CompanyInfo() { Name = "F", Value = 2 },
    };

    public IDisposable Subscribe(IObserver<CompanyInfo> observable)
    {
        return _values.ToObservable().ObserveOn(Scheduler.Default).Subscribe(observable);
    }
}

...有了这个:

StockMarket market = new StockMarket();
StockTrader trader = new StockTrader();

IObservable<CompanyInfo> differential = market  //[F, 1], [S, 5], [S, 4], [F, 2]
    .GroupBy(x => x.Name)                       //[F, 1], [F, 2]; [S, 5], [S, 4]
    .SelectMany(x => x                  //4, 8, 2, 3
        .Buffer(2, 1)                   //(4, 8), (8, 2), (2, 3), (3)
        .SkipLast(1)                    //(4, 8), (8, 2), (2, 3)
        .Select(y => new CompanyInfo    //(+100%), (-75%), (+50%)
        {
            Name = x.Key,
            Value = (y[1].Value - y[0].Value) / y[0].Value
        })                                      //[F, +100%]; [S, -20%]
    );

IDisposable subscription = differential.Subscribe(trader);

Thread.Sleep(10000);

我一次都没有让它崩溃或导致任何异常。