如何在从中删除项目之前在管道块中执行任务直到集合已满

How to make a task in pipeline block till the collection is full before removing items from it

我有 3 个任务组成了一个管道。我希望终结器任务(管道中的最后一个)仅在达到其边界容量后才开始使用来自 buffer2 的数据。现在,它会在每件商品到达时挑选它们,而我不想要那样。

 static void Main(string[] args)
    {
        List<int> input = Enumerable.Range(0, 20).ToList();

        BlockingCollection<int> buffer1 = new BlockingCollection<int>(10);
        BlockingCollection<int> buffer2 = new BlockingCollection<int>(5);

        Task producer = Task.Factory.StartNew(() => Producer(input, buffer1));
        Task consumer = Task.Factory.StartNew(() => Consumer(buffer1, buffer2));
        Task finalizer = Task.Factory.StartNew(() => Finalizer(buffer2));

        Task.WaitAll(producer,consumer, finalizer);
        Console.ReadLine();
    }

    private static void Producer(List<int> input, BlockingCollection<int> buffer1)
    {
        foreach (int i in input)
        {
            buffer1.Add(i);
        }

        buffer1.CompleteAdding();
    }

    private static void Consumer(BlockingCollection<int> buffer1, BlockingCollection<int> buffer2)
    {
        foreach(int i in buffer1.GetConsumingEnumerable())
        {
            Console.WriteLine("Consumer saw item " + i);

            buffer2.Add(i);
        }

        buffer2.CompleteAdding();
    }

    private static void Finalizer(BlockingCollection<int> buffer)
    {
        foreach (int i in buffer.GetConsumingEnumerable())
        {
            // Do some work
            Console.WriteLine("Finalizer saw item " + i);
            System.Threading.Thread.Sleep(1000);
        }

        buffer.CompleteAdding();
    }

如何让终结器在 buffer2 中有 5 个项目之前停止消耗它中的项目?。消费者也一样,只有当缓冲区 1 至少有 10 个项目时,它才应该开始消费。

也许你应该看看 TPL Dataflow from Microsoft

我已对您的代码进行了一些更改以适应 TPL

static void Main(string[] args)
{
    List<int> input = Enumerable.Range(0, 20).ToList();

    BatchBlock<int> buffer1 = new BatchBlock<int>(10);
    BatchBlock<int> buffer2 = new BatchBlock<int>(5);
    ActionBlock<int[]> action1;
    ActionBlock<int[]> action2;

    action1 = new ActionBlock<int[]>(t => { Consumer(t, buffer2); },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

    buffer1.LinkTo(action1, new DataflowLinkOptions { PropagateCompletion = true });

    action2 = new ActionBlock<int[]>(t => Finalizer(t),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

    buffer2.LinkTo(action2, new DataflowLinkOptions { PropagateCompletion = true });

    Task produceTask = Task.Factory.StartNew(() => Producer(input, buffer1));

    Task.WaitAll(produceTask);
    action1.Completion.Wait();//Will add all the items to buffer2
    buffer2.Complete();//will not get any new items
    action2.Completion.Wait();//Process the batch of 5 and then complete
    Console.WriteLine("Process complete");
    Console.ReadLine();
}

private static void Finalizer(int[] t)
{
    Console.WriteLine("Received a batch of items {0}", t.Count());
    foreach (int i in t)
    {
        // Do some work
        Console.WriteLine("Finalizer saw item " + i);
        System.Threading.Thread.Sleep(1000);
    }
}

private static void Consumer(int[] t, BatchBlock<int> buffer2)
{
    foreach (var item in t)
    {
        Console.WriteLine("Consumer saw item " + item);
        buffer2.Post(item);
    }

}

public static void Producer(List<int> input, BatchBlock<int> buffer1)
{
    foreach (int i in input)
    {
        buffer1.Post(i);
    }

    //Once marked complete your entire data flow will signal a stop for
    // all new items
    buffer1.Complete();

}

Here是我用的nuget包

编辑 更新了上面的代码以在消费时取最小计数 10。至于你怀疑当我们有偏斜分布时它是否符合要求那么答案是 YES

我知道这已得到解答,但只是想通过实现自己的 GetConsumingEnumerable 版本来展示另一种方法。您通过将集合作为参数传递来调用此方法而不是 BlockingCollection 方法:

    private IEnumerable<T> GetConsumingEnumerable<T>(BlockingCollection<T> sourceCollection)
    {
        var buffer = new List<T>();

        foreach (var item in sourceCollection.GetConsumingEnumerable())
        {
            buffer.Add(item);

            if (buffer.Count >= sourceCollection.BoundedCapacity)
            {
                foreach (var bItem in buffer)
                {
                    yield return bItem;
                }

                buffer.Clear();
            }
        }

        foreach (var bItem in buffer)
        {
            yield return bItem;
        }
    }