如何在 TPL 中实现与 BlockingCollection 类似的功能?

How can I achieve similar functionality as BlockingCollection in TPL?

我创建了一个包含 BlockingCollection 的简单 class。它表示将按照接收到的顺序执行的操作队列。我已经阅读了很多关于 TPL 的文章,看起来我应该使用它而不是我目前正在使用的东西。原因之一是单元测试会更容易,而且编写的代码也会更少。我知道您可以使用 Task.Factory.StartNew() 等轻松启动新任务,但不确定如何以与我目前拥有的 class 类似的方式使用它。我怎样才能用 TPL 完成同样的事情?

根据要求,这是我创建的 class:

public class MyService
{
    /// <summary>Queue of actions to be consumed on a separate thread</summary>
    private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();

    public MyService()
    {
        StartService();
    }

    public void AddToQueue(MyObject newObject)
    {
        queue.Add(newObject);
    }

    private void StartService()
    {
        System.Threading.Tasks.Task.Factory.StartNew(() =>
        {
            while (true)
            {
                try
                {
                    MyObject myObject = queue.Take(); // blocks until new object received

                    // Do work...
                }
                catch (Exception e)
                {
                    // Log...
                }
            }
        });
    }
}

BlockingCollection 和为简单的生产者-消费者场景创建的异步集合系列。 (例如一个作者和多个读者)

当然 - 你可以设法构建几乎与 Task.Run 相同的东西,它将添加、删除、清理等项目到非同步集合,如 List<T>,但你必须管理所有你自己的多线程问题(而且有很多问题)。

例如:

public class MyService
{
    /// <summary>Queue of actions to be consumed on a separate thread</summary>
    private BlockingCollection<MyObject> queue = new BlockingCollection<MyObject>();
    private IEnumerable<Task> readers = Enumerable.Range(0, 10).Select((t) => new Task(() => this.StartService()));

public MyService()
{
    StartService();
    readers.AsParallel().ForAll(t => t.Start());
}

public void AddToQueue(MyObject newObject)
{
    queue.Add(newObject);
}

private void StartService()
{
        while (true)
        {
            try
            {
                MyObject myObject = queue.Take(); // blocks until new object received

                // Do work...
            }
            catch (Exception e)
            {
                // Log...
            }
        }
    }
}

你看 - 同一个集合中有多个 'reader'。如果您自己完成了 BlockingCollection,您应该处理所有 lock 的收集等问题。

旧时尚,阻塞同步和基于任务的异步不能很好地混合。

Task.Run(() =>
    {
        while (true)
        {
            // some thing that sometimes blocks
        }
    });

只是一种写法

new Thread(() =>
   {
      while (true)
      {
          // some thing that sometimes blocks
      }
});

这两个将几乎永远占用一个线程。第一个会使用线程池中的一个,这应该比专门创建的更好,但是因为它以后永远不会释放它,所以好处就消失了。

如果你想使用任务和 TPL,并从中获益,你应该尽可能避免任何阻塞。例如,您可以使用 ConcurrentQueue 作为后备队列并执行如下操作:

public class MyService
{
    /// <summary>Queue of actions to be consumed by separate task</summary>
    private ConcurrentQueue<MyObject> queue = new ConcurrentQueue<MyObject>();

    private bool _isRunning = false;
    private Task _consumingTask;

    public MyService()
    {
    }

    public void AddToQueue(MyObject newObject)
    {
        queue.Add(newObject);
    }

    private void StartService()
    {
        _isRunning = true;
        Task.Run( async () =>
        {
            while (_isRunning )
            {
                MyObject myObject;

                while(_isRunning && queue.TryDequeue(out myObject)
                {
                    try
                    {
                        // Do work...
                    }
                    catch (Exception e)
                    {
                        // Log...
                    }
                }
                await Task.Delay(100); // tune this value to one pertinent to your use case 
            }
        });
    }

    public void StopService()
    {
        _isRunning = false;
        _consumingTask.Wait();
    }
}

这个实现从不阻塞,只在真正需要计算时占用一个线程。它也很容易优雅地混合其他 Task

TLDR:如果您走 Task 路,请一路走下去。中间点真的不是你想要的,你会得到所有的复杂性和none的优点。