线程安全的有限大小队列
Thread safe limited size queue
我正在尝试编写一个主题队列,但我遇到了死锁和其他多线程问题。我想使用 Interlocked.CompareExchange
来避免使用 lock
。但是这段代码没有按预期工作:它只是擦除整个队列。我在这里做错了什么?
public class FixedSizedQueue<T> : IEnumerable<T>
{
readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
public int Limit { get; set; }
public FixedSizedQueue(int limit)
{
Limit = limit;
}
public void Enqueue(T obj)
{
_queue.Enqueue(obj);
if (_queue.Count <= Limit)
return;
int count = _queue.Count;
if (_queue.Count != Interlocked.CompareExchange(ref count, count, _queue.Count))
{
T overflow;
while (_queue.TryDequeue(out overflow))
{
}
}
}
public T[] ToArray()
{
return _queue.ToArray();
}
public IEnumerator<T> GetEnumerator()
{
return _queue.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
也许我只需要另一个线程来切断队列...
Interlocked.CompareExchange
在堆栈变量 count
上没有意义,因为它是从单线程访问的。正如我猜想的那样,您尝试在 _queue.Count
上使用此方法,但编译失败,因为 .Count
是一个 属性,而不是一个简单的变量。所以你需要在 your class.
中定义计数器
public class FixedSizedQueue<T> : IEnumerable<T>
{
readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
int CountShadow = 0; // Counter for check constraints.
public int Limit { get; set; }
public FixedSizedQueue(int limit)
{
Limit = limit;
}
public void Enqueue(T obj)
{
/* Update shadow counter first for check constraints. */
int count = CountShadow;
while(true)
{
if(count => Limit) return; // Adding element would violate constraint
int countOld = Interlocked.CompareExchange(ref CountShadow, count, count + 1);
if(countOld == count) break; //Successful update
count = countOld;
}
_queue.Enqueue(obj); // This will update real counter.
}
...
}
此外,您需要为 Limit
属性 设置自己的 setter,这将保持不变 CountShadow <= Limit
。或者只是禁止用户在对象构造后设置 属性。
我正在尝试编写一个主题队列,但我遇到了死锁和其他多线程问题。我想使用 Interlocked.CompareExchange
来避免使用 lock
。但是这段代码没有按预期工作:它只是擦除整个队列。我在这里做错了什么?
public class FixedSizedQueue<T> : IEnumerable<T>
{
readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
public int Limit { get; set; }
public FixedSizedQueue(int limit)
{
Limit = limit;
}
public void Enqueue(T obj)
{
_queue.Enqueue(obj);
if (_queue.Count <= Limit)
return;
int count = _queue.Count;
if (_queue.Count != Interlocked.CompareExchange(ref count, count, _queue.Count))
{
T overflow;
while (_queue.TryDequeue(out overflow))
{
}
}
}
public T[] ToArray()
{
return _queue.ToArray();
}
public IEnumerator<T> GetEnumerator()
{
return _queue.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
也许我只需要另一个线程来切断队列...
Interlocked.CompareExchange
在堆栈变量 count
上没有意义,因为它是从单线程访问的。正如我猜想的那样,您尝试在 _queue.Count
上使用此方法,但编译失败,因为 .Count
是一个 属性,而不是一个简单的变量。所以你需要在 your class.
public class FixedSizedQueue<T> : IEnumerable<T>
{
readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
int CountShadow = 0; // Counter for check constraints.
public int Limit { get; set; }
public FixedSizedQueue(int limit)
{
Limit = limit;
}
public void Enqueue(T obj)
{
/* Update shadow counter first for check constraints. */
int count = CountShadow;
while(true)
{
if(count => Limit) return; // Adding element would violate constraint
int countOld = Interlocked.CompareExchange(ref CountShadow, count, count + 1);
if(countOld == count) break; //Successful update
count = countOld;
}
_queue.Enqueue(obj); // This will update real counter.
}
...
}
此外,您需要为 Limit
属性 设置自己的 setter,这将保持不变 CountShadow <= Limit
。或者只是禁止用户在对象构造后设置 属性。