抛出可观察的 LINQ 不一致异常
Observable LINQ inconsistent exceptions thrown
在我编写股票市场交易员的过程中 IObserver
我遇到了三个错误,这些错误大多来自 Reactive Extensions
库。
我有以下 CompanyInfo
class:
public class CompanyInfo
{
public string Name { get; set; }
public double Value { get; set; }
}
还有一个 IObservable<CompanyInfo>
叫做 StockMarket
:
public class StockMarket : IObservable<CompanyInfo>
我的 Observer
如下所示:
public class StockTrader : IObserver<CompanyInfo>
{
public void OnCompleted()
{
Console.WriteLine("Market Closed");
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(CompanyInfo value)
{
WriteStock(value);
}
private void WriteStock(CompanyInfo value) { ... }
}
我运行以下代码:
StockMarket market = GetStockMarket();
StockTrader trader = new StockTrader();
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
}) //[F, +100%]; [S, -20%]
);
using (IDisposable subscription = differential.Subscribe(trader))
{
Observable.Wait(market);
}
出现三个错误之一:
以下 ArgumentException
从 Reactive Extensions
中抛出 :
System.ArgumentException: An item with the same key has already been added.
at System.ThrowHelper.ThrowArgumentException(ExceptionResource resource)
at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add)
at System.Reactive.Linq.Observable.GroupBy'3._.OnNext(TSource value)
以下IndexOutOfRangeException
:
Parameter name: index
at System.ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument argument, ExceptionResource resource)
at System.Collections.Generic.List'1.get_Item(Int32 index)
at StockMarketTests.<>c__DisplayClass0_0.b__2(IList'1 y)
at System.Reactive.Linq.Observable.Select'2._.OnNext(TSource value)
Console
的文字偶尔会有调整(颜色应该一致):
是什么导致了这些奇怪的症状?
关于 Reactive Extensions
概念的最伟大的事情之一是能够订阅发生的 'occurrence' (IObservable
) 'somewhere' 并在此 'occurrence' 上应用面向对象的概念 - 这无需知道 在哪里'somewhere'是.
这样 Reactive Extensions
简化了面向 event
的编程和 producer-consumer
problems 很多 。
在不知道观察到的数据来源的情况下订阅IObservable
的能力迫使订阅者假设通知是不可预测的.换句话说,在观察 IObservable
时,您应该假设通知可以同时传送 .
由于 Reactive Externsions
的行为契约,IObservables
应该一次生产一件商品。通常情况就是这样,但有时外部实施不遵循该合同。
让我们分别看一下这三个问题:
GroupBy
不是线程安全的
GroupBy
通过返回一个 IObservable<IGroupedObservable<T>>
来工作,它的 OnNext
方法使用 IGroupedObservable<T>
调用外部 IObservable
的 OnNext
匹配当前通知。它通过为 Dictionary
中的每个键保留一个 IGroupedObservable<T>
(更准确地说是一个 Subject<T>
) 来实现 - 这并不奇怪 - 不是 ConcurrentDictionary
。这意味着 两个接近的通知可能导致双重插入.
Select
不孤单
Select
的线程安全由其提供的委托决定。在上面的例子中,提供给 Select
方法的委托依赖于 Buffer(2, 1)
将提供大小为 2 的列表这一事实。 Buffer
包含一个 Queue
,它不是并发的 ,因此 当从多个线程迭代时 - Buffer
的 Queue
可以给我们一些意想不到的结果.
另一个可能因相同原因抛出的 Exception
是 NullReferenceException
if y
would be provided null
, or an InvalidOperationException
因为 Queue
可以在迭代时进行修改。
连基本观察都不安全
最后但同样重要的是,即使您只进行基本观察,StockTrader
的 OnNext
方法也会以非 atomic operation 方式修改控制台,这会导致奇怪文本布局。
那你能做什么呢?
Synchronize
method exists to make you able to validate that you are subscribing to a linear IObservable<T>
这意味着不能同时调用OnNext
方法。
因为即使 GroupBy
扩展方法也不是线程安全的,所以需要在链的开头调用 Synchronize
方法:
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.Synchronize()
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
})
); //[F, +100%]; [S, -20%]
请注意 Synchronize
向您的查询添加了另一个代理 Observable
,因此它会使查询变慢一点,因此 您应该避免使用不需要的时候.
您的代码的问题不在于查询,也不在于 Rx 本身。问题可能来自您实际的 StockMarket
或 StockTrader
实现。
现在,问题很可能是因为您正在为您的 market
observable 创建两个订阅。
当你这样写的时候:
using (IDisposable subscription = differential.Subscribe(trader))
{
Observable.Wait(market);
}
...您将收到两个 market
的订阅。一个在 differential.Subscribe(trader)
中,另一个是因为 Observable.Wait(market);
.
我怀疑是两个并发订阅导致了您的问题,但是在没有看到 StockMarket
的实现的情况下,我们无法判断它为什么会抛出错误。
这是实现您自己的可观察和观察者实现的危险。你应该避免这样做。使用标准 Rx 运算符构建的 属性 IObservable<CompanyInfo> CompanyValues { get; }
挂在 CompanyInfo
上会更好。
并且您应该始终避免像 .Wait(...)
.
这样的阻塞操作
作为快速测试,我会将您当前的 Observable.Wait(market);
替换为具有足够长睡眠时间的 Thread.Sleep(?)
,以查看您的代码是否正常运行。当然,您需要确保在后台调度程序上生成值(如 Scheduler.Default
)。
我运行这个代码来测试你的查询:
public class CompanyInfo
{
public string Name { get; set; }
public double Value { get; set; }
}
public class StockTrader : IObserver<CompanyInfo>
{
public void OnCompleted()
{
Console.WriteLine("Market Closed");
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(CompanyInfo value)
{
WriteStock(value);
}
private void WriteStock(CompanyInfo value) { Console.WriteLine($"{value.Name} = {value.Value}"); }
}
public class StockMarket : IObservable<CompanyInfo>
{
private CompanyInfo[] _values = new CompanyInfo[]
{
new CompanyInfo() { Name = "F", Value = 1 },
new CompanyInfo() { Name = "S", Value = 5 },
new CompanyInfo() { Name = "S", Value = 4 },
new CompanyInfo() { Name = "F", Value = 2 },
};
public IDisposable Subscribe(IObserver<CompanyInfo> observable)
{
return _values.ToObservable().ObserveOn(Scheduler.Default).Subscribe(observable);
}
}
...有了这个:
StockMarket market = new StockMarket();
StockTrader trader = new StockTrader();
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
}) //[F, +100%]; [S, -20%]
);
IDisposable subscription = differential.Subscribe(trader);
Thread.Sleep(10000);
我一次都没有让它崩溃或导致任何异常。
在我编写股票市场交易员的过程中 IObserver
我遇到了三个错误,这些错误大多来自 Reactive Extensions
库。
我有以下 CompanyInfo
class:
public class CompanyInfo
{
public string Name { get; set; }
public double Value { get; set; }
}
还有一个 IObservable<CompanyInfo>
叫做 StockMarket
:
public class StockMarket : IObservable<CompanyInfo>
我的 Observer
如下所示:
public class StockTrader : IObserver<CompanyInfo>
{
public void OnCompleted()
{
Console.WriteLine("Market Closed");
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(CompanyInfo value)
{
WriteStock(value);
}
private void WriteStock(CompanyInfo value) { ... }
}
我运行以下代码:
StockMarket market = GetStockMarket();
StockTrader trader = new StockTrader();
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
}) //[F, +100%]; [S, -20%]
);
using (IDisposable subscription = differential.Subscribe(trader))
{
Observable.Wait(market);
}
出现三个错误之一:
以下
ArgumentException
从Reactive Extensions
中抛出 :System.ArgumentException: An item with the same key has already been added. at System.ThrowHelper.ThrowArgumentException(ExceptionResource resource) at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add) at System.Reactive.Linq.Observable.GroupBy'3._.OnNext(TSource value)
以下
IndexOutOfRangeException
:Parameter name: index at System.ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument argument, ExceptionResource resource) at System.Collections.Generic.List'1.get_Item(Int32 index) at StockMarketTests.<>c__DisplayClass0_0.b__2(IList'1 y) at System.Reactive.Linq.Observable.Select'2._.OnNext(TSource value)
Console
的文字偶尔会有调整(颜色应该一致):
是什么导致了这些奇怪的症状?
关于 Reactive Extensions
概念的最伟大的事情之一是能够订阅发生的 'occurrence' (IObservable
) 'somewhere' 并在此 'occurrence' 上应用面向对象的概念 - 这无需知道 在哪里'somewhere'是.
这样 Reactive Extensions
简化了面向 event
的编程和 producer-consumer
problems 很多 。
在不知道观察到的数据来源的情况下订阅IObservable
的能力迫使订阅者假设通知是不可预测的.换句话说,在观察 IObservable
时,您应该假设通知可以同时传送 .
由于 Reactive Externsions
的行为契约,IObservables
应该一次生产一件商品。通常情况就是这样,但有时外部实施不遵循该合同。
让我们分别看一下这三个问题:
GroupBy
不是线程安全的
GroupBy
通过返回一个 IObservable<IGroupedObservable<T>>
来工作,它的 OnNext
方法使用 IGroupedObservable<T>
调用外部 IObservable
的 OnNext
匹配当前通知。它通过为 Dictionary
中的每个键保留一个 IGroupedObservable<T>
(更准确地说是一个 Subject<T>
) 来实现 - 这并不奇怪 - 不是 ConcurrentDictionary
。这意味着 两个接近的通知可能导致双重插入.
Select
不孤单
Select
的线程安全由其提供的委托决定。在上面的例子中,提供给 Select
方法的委托依赖于 Buffer(2, 1)
将提供大小为 2 的列表这一事实。 Buffer
包含一个 Queue
,它不是并发的 ,因此 当从多个线程迭代时 - Buffer
的 Queue
可以给我们一些意想不到的结果.
另一个可能因相同原因抛出的 Exception
是 NullReferenceException
if y
would be provided null
, or an InvalidOperationException
因为 Queue
可以在迭代时进行修改。
连基本观察都不安全
最后但同样重要的是,即使您只进行基本观察,StockTrader
的 OnNext
方法也会以非 atomic operation 方式修改控制台,这会导致奇怪文本布局。
那你能做什么呢?
Synchronize
method exists to make you able to validate that you are subscribing to a linear IObservable<T>
这意味着不能同时调用OnNext
方法。
因为即使 GroupBy
扩展方法也不是线程安全的,所以需要在链的开头调用 Synchronize
方法:
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.Synchronize()
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
})
); //[F, +100%]; [S, -20%]
请注意 Synchronize
向您的查询添加了另一个代理 Observable
,因此它会使查询变慢一点,因此 您应该避免使用不需要的时候.
您的代码的问题不在于查询,也不在于 Rx 本身。问题可能来自您实际的 StockMarket
或 StockTrader
实现。
现在,问题很可能是因为您正在为您的 market
observable 创建两个订阅。
当你这样写的时候:
using (IDisposable subscription = differential.Subscribe(trader))
{
Observable.Wait(market);
}
...您将收到两个 market
的订阅。一个在 differential.Subscribe(trader)
中,另一个是因为 Observable.Wait(market);
.
我怀疑是两个并发订阅导致了您的问题,但是在没有看到 StockMarket
的实现的情况下,我们无法判断它为什么会抛出错误。
这是实现您自己的可观察和观察者实现的危险。你应该避免这样做。使用标准 Rx 运算符构建的 属性 IObservable<CompanyInfo> CompanyValues { get; }
挂在 CompanyInfo
上会更好。
并且您应该始终避免像 .Wait(...)
.
作为快速测试,我会将您当前的 Observable.Wait(market);
替换为具有足够长睡眠时间的 Thread.Sleep(?)
,以查看您的代码是否正常运行。当然,您需要确保在后台调度程序上生成值(如 Scheduler.Default
)。
我运行这个代码来测试你的查询:
public class CompanyInfo
{
public string Name { get; set; }
public double Value { get; set; }
}
public class StockTrader : IObserver<CompanyInfo>
{
public void OnCompleted()
{
Console.WriteLine("Market Closed");
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(CompanyInfo value)
{
WriteStock(value);
}
private void WriteStock(CompanyInfo value) { Console.WriteLine($"{value.Name} = {value.Value}"); }
}
public class StockMarket : IObservable<CompanyInfo>
{
private CompanyInfo[] _values = new CompanyInfo[]
{
new CompanyInfo() { Name = "F", Value = 1 },
new CompanyInfo() { Name = "S", Value = 5 },
new CompanyInfo() { Name = "S", Value = 4 },
new CompanyInfo() { Name = "F", Value = 2 },
};
public IDisposable Subscribe(IObserver<CompanyInfo> observable)
{
return _values.ToObservable().ObserveOn(Scheduler.Default).Subscribe(observable);
}
}
...有了这个:
StockMarket market = new StockMarket();
StockTrader trader = new StockTrader();
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
}) //[F, +100%]; [S, -20%]
);
IDisposable subscription = differential.Subscribe(trader);
Thread.Sleep(10000);
我一次都没有让它崩溃或导致任何异常。