MSDN 上 SimplePriorityQueue 示例中的严重错误

Severe bugs in SimplePriorityQueue example on MSDN

I need to use a concurrent priority queue, and I was considering adapting the SimplePriorityQueue<TPriority, TValue> sample given in the How to: Add Bounding and Blocking Functionality to a Collection tutorial on MSDN. However, I was surprised at the severity of the bugs that the said sample seems to have. Could someone verify whether these issues are really present?

1) A race hazard exists between TryAdd and ToArray, which can cause an ArgumentException to be thrown from the latter. The TryAdd method first adds an item to an internal queue, then increments the m_count counter. On the other hand, ToArray first initializes a new array of size m_count, then copies the internal queues onto this array. If TryAdd is called whilst ToArray is being executed, then ToArray might end up attempting to copy more items than it had allocated space for in the array, causing the CopyTo call to throw an ArgumentException.

private ConcurrentQueue<KeyValuePair<int, TValue>>[] _queues;
private int m_count;

// ...

// IProducerConsumerCollection members
public bool TryAdd(KeyValuePair<int, TValue> item)
{
    _queues[item.Key].Enqueue(item);
    Interlocked.Increment(ref m_count);
    return true;
}    

public int Count
{
    get { return m_count; }
}

public KeyValuePair<int, TValue>[] ToArray()
{
    KeyValuePair<int, TValue>[] result;

    lock (_queues)
    {
        result = new KeyValuePair<int, TValue>[this.Count];
        // *** context switch here; new item gets added ***
        int index = 0;
        foreach (var q in _queues)
        {
            if (q.Count > 0)
            {
                q.CopyTo(result, index);   // *** ArgumentException ***
                index += q.Count;
            }
        }
        return result;
    }
}

2) Another race hazard exists in the GetEnumerator method: There is no synchronization between updates to the internal queues.

public IEnumerator<KeyValuePair<int, TValue>> GetEnumerator()
{
    for (int i = 0; i < priorityCount; i++)
    {
        foreach (var item in _queues[i])
            yield return item;
    }
}

Consider the following code snippet, which takes an item from the queue and re-adds it with incremented priority:

if (queue.TryTake(out item) && item.Key < maxPriority - 1)
    queue.TryAdd(new KeyValuePair<int, string>(item.Key + 1, item.Value))

If the above snippet were 运行 concurrently with an enumeration, one would expect the item to appear at most once, either with the original or with the incremented priority – or, possibly, not appear at all. One would not expect the item to appear twice, at both priorities. However, since the GetEnumerator iterates over its internal queues sequentially, it does not protect against such ordering inconsistencies across queues.

3) The public Count 属性 can return stale values, since it reads the shared m_count field without any memory fences. If a consumer accesses this 属性 in a loop that doesn't generate memory fences of its own, such as the below, they could be stuck in an infinite loop, despite items being added to the queue by other threads.

while (queue.Count == 0) 
{ }

The need for memory fences when reading shared variables has been discussed in several other posts:

4) There is no memory barrier between the initialization of the _queues array and the completion of the SimplePriorityQueue constructor. Race hazards could arise when external consumers on another thread call TryAdd and access _queues before its initialization has completed (or appears as completed on their memory cache). This is discussed further in my other question on constructors and memory barriers.

5) TryTake and ToArray are protected through the use of the lock keyword. Apart from being inadequate (due to the bugs discussed above), this also defeats the entire purpose of designing a concurrent collection. Given its shortcomings, I think the best approach would be to downgrade the internal ConcurrentQueue structures to plain Queue, add locks everywhere, and start treating this as a non-concurrent but thread-safe structure.

我认为这取决于您将如何使用 class。如果您将自己限制在 TryAddTryTake(这是 BlockingCollection<T> 所依赖的两个主要内容),您应该不会有任何问题,并且您将拥有一个非常快速的锁定最小优先级队列。

如果您开始使用 CountCopyTo 或任何其他方法,您可能会 运行 解决您指出的问题。