如何在 TPL 中实现与 BlockingCollection 类似的功能?
How can I achieve similar functionality as BlockingCollection in TPL?
我创建了一个包含 BlockingCollection 的简单 class。它表示将按照接收到的顺序执行的操作队列。我已经阅读了很多关于 TPL 的文章,看起来我应该使用它而不是我目前正在使用的东西。原因之一是单元测试会更容易,而且编写的代码也会更少。我知道您可以使用 Task.Factory.StartNew() 等轻松启动新任务,但不确定如何以与我目前拥有的 class 类似的方式使用它。我怎样才能用 TPL 完成同样的事情?
根据要求,这是我创建的 class:
public class MyService
{
/// <summary>Queue of actions to be consumed on a separate thread</summary>
private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();
public MyService()
{
StartService();
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
System.Threading.Tasks.Task.Factory.StartNew(() =>
{
while (true)
{
try
{
MyObject myObject = queue.Take(); // blocks until new object received
// Do work...
}
catch (Exception e)
{
// Log...
}
}
});
}
}
BlockingCollection
和为简单的生产者-消费者场景创建的异步集合系列。 (例如一个作者和多个读者)
当然 - 你可以设法构建几乎与 Task.Run
相同的东西,它将添加、删除、清理等项目到非同步集合,如 List<T>
,但你必须管理所有你自己的多线程问题(而且有很多问题)。
例如:
public class MyService
{
/// <summary>Queue of actions to be consumed on a separate thread</summary>
private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();
private IEnumerable<Task> readers = Enumerable.Range(0, 10).Select((t) => new Task(() => this.StartService()));
public MyService()
{
StartService();
readers.AsParallel().ForAll(t => t.Start());
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
while (true)
{
try
{
MyObject myObject = queue.Take(); // blocks until new object received
// Do work...
}
catch (Exception e)
{
// Log...
}
}
}
}
你看 - 同一个集合中有多个 'reader'。如果您自己完成了 BlockingCollection
,您应该处理所有 lock
的收集等问题。
旧时尚,阻塞同步和基于任务的异步不能很好地混合。
Task.Run(() =>
{
while (true)
{
// some thing that sometimes blocks
}
});
只是一种写法
new Thread(() =>
{
while (true)
{
// some thing that sometimes blocks
}
});
这两个将几乎永远占用一个线程。第一个会使用线程池中的一个,这应该比专门创建的更好,但是因为它以后永远不会释放它,所以好处就消失了。
如果你想使用任务和 TPL,并从中获益,你应该尽可能避免任何阻塞。例如,您可以使用 ConcurrentQueue
作为后备队列并执行如下操作:
public class MyService
{
/// <summary>Queue of actions to be consumed by separate task</summary>
private ConcurrentQueue<MyObject> queue = new ConcurrentQueue<MyObject>();
private bool _isRunning = false;
private Task _consumingTask;
public MyService()
{
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
_isRunning = true;
Task.Run( async () =>
{
while (_isRunning )
{
MyObject myObject;
while(_isRunning && queue.TryDequeue(out myObject)
{
try
{
// Do work...
}
catch (Exception e)
{
// Log...
}
}
await Task.Delay(100); // tune this value to one pertinent to your use case
}
});
}
public void StopService()
{
_isRunning = false;
_consumingTask.Wait();
}
}
这个实现从不阻塞,只在真正需要计算时占用一个线程。它也很容易优雅地混合其他 Task
。
TLDR:如果您走 Task
路,请一路走下去。中间点真的不是你想要的,你会得到所有的复杂性和none的优点。
我创建了一个包含 BlockingCollection 的简单 class。它表示将按照接收到的顺序执行的操作队列。我已经阅读了很多关于 TPL 的文章,看起来我应该使用它而不是我目前正在使用的东西。原因之一是单元测试会更容易,而且编写的代码也会更少。我知道您可以使用 Task.Factory.StartNew() 等轻松启动新任务,但不确定如何以与我目前拥有的 class 类似的方式使用它。我怎样才能用 TPL 完成同样的事情?
根据要求,这是我创建的 class:
public class MyService
{
/// <summary>Queue of actions to be consumed on a separate thread</summary>
private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();
public MyService()
{
StartService();
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
System.Threading.Tasks.Task.Factory.StartNew(() =>
{
while (true)
{
try
{
MyObject myObject = queue.Take(); // blocks until new object received
// Do work...
}
catch (Exception e)
{
// Log...
}
}
});
}
}
BlockingCollection
和为简单的生产者-消费者场景创建的异步集合系列。 (例如一个作者和多个读者)
当然 - 你可以设法构建几乎与 Task.Run
相同的东西,它将添加、删除、清理等项目到非同步集合,如 List<T>
,但你必须管理所有你自己的多线程问题(而且有很多问题)。
例如:
public class MyService
{
/// <summary>Queue of actions to be consumed on a separate thread</summary>
private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();
private IEnumerable<Task> readers = Enumerable.Range(0, 10).Select((t) => new Task(() => this.StartService()));
public MyService()
{
StartService();
readers.AsParallel().ForAll(t => t.Start());
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
while (true)
{
try
{
MyObject myObject = queue.Take(); // blocks until new object received
// Do work...
}
catch (Exception e)
{
// Log...
}
}
}
}
你看 - 同一个集合中有多个 'reader'。如果您自己完成了 BlockingCollection
,您应该处理所有 lock
的收集等问题。
旧时尚,阻塞同步和基于任务的异步不能很好地混合。
Task.Run(() =>
{
while (true)
{
// some thing that sometimes blocks
}
});
只是一种写法
new Thread(() =>
{
while (true)
{
// some thing that sometimes blocks
}
});
这两个将几乎永远占用一个线程。第一个会使用线程池中的一个,这应该比专门创建的更好,但是因为它以后永远不会释放它,所以好处就消失了。
如果你想使用任务和 TPL,并从中获益,你应该尽可能避免任何阻塞。例如,您可以使用 ConcurrentQueue
作为后备队列并执行如下操作:
public class MyService
{
/// <summary>Queue of actions to be consumed by separate task</summary>
private ConcurrentQueue<MyObject> queue = new ConcurrentQueue<MyObject>();
private bool _isRunning = false;
private Task _consumingTask;
public MyService()
{
}
public void AddToQueue(MyObject newObject)
{
queue.Add(newObject);
}
private void StartService()
{
_isRunning = true;
Task.Run( async () =>
{
while (_isRunning )
{
MyObject myObject;
while(_isRunning && queue.TryDequeue(out myObject)
{
try
{
// Do work...
}
catch (Exception e)
{
// Log...
}
}
await Task.Delay(100); // tune this value to one pertinent to your use case
}
});
}
public void StopService()
{
_isRunning = false;
_consumingTask.Wait();
}
}
这个实现从不阻塞,只在真正需要计算时占用一个线程。它也很容易优雅地混合其他 Task
。
TLDR:如果您走 Task
路,请一路走下去。中间点真的不是你想要的,你会得到所有的复杂性和none的优点。