如何在从中删除项目之前在管道块中执行任务直到集合已满
How to make a task in pipeline block till the collection is full before removing items from it
我有 3 个任务组成了一个管道。我希望终结器任务(管道中的最后一个)仅在达到其边界容量后才开始使用来自 buffer2 的数据。现在,它会在每件商品到达时挑选它们,而我不想要那样。
static void Main(string[] args)
{
List<int> input = Enumerable.Range(0, 20).ToList();
BlockingCollection<int> buffer1 = new BlockingCollection<int>(10);
BlockingCollection<int> buffer2 = new BlockingCollection<int>(5);
Task producer = Task.Factory.StartNew(() => Producer(input, buffer1));
Task consumer = Task.Factory.StartNew(() => Consumer(buffer1, buffer2));
Task finalizer = Task.Factory.StartNew(() => Finalizer(buffer2));
Task.WaitAll(producer,consumer, finalizer);
Console.ReadLine();
}
private static void Producer(List<int> input, BlockingCollection<int> buffer1)
{
foreach (int i in input)
{
buffer1.Add(i);
}
buffer1.CompleteAdding();
}
private static void Consumer(BlockingCollection<int> buffer1, BlockingCollection<int> buffer2)
{
foreach(int i in buffer1.GetConsumingEnumerable())
{
Console.WriteLine("Consumer saw item " + i);
buffer2.Add(i);
}
buffer2.CompleteAdding();
}
private static void Finalizer(BlockingCollection<int> buffer)
{
foreach (int i in buffer.GetConsumingEnumerable())
{
// Do some work
Console.WriteLine("Finalizer saw item " + i);
System.Threading.Thread.Sleep(1000);
}
buffer.CompleteAdding();
}
如何让终结器在 buffer2 中有 5 个项目之前停止消耗它中的项目?。消费者也一样,只有当缓冲区 1 至少有 10 个项目时,它才应该开始消费。
也许你应该看看 TPL Dataflow from Microsoft
我已对您的代码进行了一些更改以适应 TPL
static void Main(string[] args)
{
List<int> input = Enumerable.Range(0, 20).ToList();
BatchBlock<int> buffer1 = new BatchBlock<int>(10);
BatchBlock<int> buffer2 = new BatchBlock<int>(5);
ActionBlock<int[]> action1;
ActionBlock<int[]> action2;
action1 = new ActionBlock<int[]>(t => { Consumer(t, buffer2); },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer1.LinkTo(action1, new DataflowLinkOptions { PropagateCompletion = true });
action2 = new ActionBlock<int[]>(t => Finalizer(t),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer2.LinkTo(action2, new DataflowLinkOptions { PropagateCompletion = true });
Task produceTask = Task.Factory.StartNew(() => Producer(input, buffer1));
Task.WaitAll(produceTask);
action1.Completion.Wait();//Will add all the items to buffer2
buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete
Console.WriteLine("Process complete");
Console.ReadLine();
}
private static void Finalizer(int[] t)
{
Console.WriteLine("Received a batch of items {0}", t.Count());
foreach (int i in t)
{
// Do some work
Console.WriteLine("Finalizer saw item " + i);
System.Threading.Thread.Sleep(1000);
}
}
private static void Consumer(int[] t, BatchBlock<int> buffer2)
{
foreach (var item in t)
{
Console.WriteLine("Consumer saw item " + item);
buffer2.Post(item);
}
}
public static void Producer(List<int> input, BatchBlock<int> buffer1)
{
foreach (int i in input)
{
buffer1.Post(i);
}
//Once marked complete your entire data flow will signal a stop for
// all new items
buffer1.Complete();
}
Here是我用的nuget包
编辑 更新了上面的代码以在消费时取最小计数 10。至于你怀疑当我们有偏斜分布时它是否符合要求那么答案是 YES
我知道这已得到解答,但只是想通过实现自己的 GetConsumingEnumerable 版本来展示另一种方法。您通过将集合作为参数传递来调用此方法而不是 BlockingCollection 方法:
private IEnumerable<T> GetConsumingEnumerable<T>(BlockingCollection<T> sourceCollection)
{
var buffer = new List<T>();
foreach (var item in sourceCollection.GetConsumingEnumerable())
{
buffer.Add(item);
if (buffer.Count >= sourceCollection.BoundedCapacity)
{
foreach (var bItem in buffer)
{
yield return bItem;
}
buffer.Clear();
}
}
foreach (var bItem in buffer)
{
yield return bItem;
}
}
我有 3 个任务组成了一个管道。我希望终结器任务(管道中的最后一个)仅在达到其边界容量后才开始使用来自 buffer2 的数据。现在,它会在每件商品到达时挑选它们,而我不想要那样。
static void Main(string[] args)
{
List<int> input = Enumerable.Range(0, 20).ToList();
BlockingCollection<int> buffer1 = new BlockingCollection<int>(10);
BlockingCollection<int> buffer2 = new BlockingCollection<int>(5);
Task producer = Task.Factory.StartNew(() => Producer(input, buffer1));
Task consumer = Task.Factory.StartNew(() => Consumer(buffer1, buffer2));
Task finalizer = Task.Factory.StartNew(() => Finalizer(buffer2));
Task.WaitAll(producer,consumer, finalizer);
Console.ReadLine();
}
private static void Producer(List<int> input, BlockingCollection<int> buffer1)
{
foreach (int i in input)
{
buffer1.Add(i);
}
buffer1.CompleteAdding();
}
private static void Consumer(BlockingCollection<int> buffer1, BlockingCollection<int> buffer2)
{
foreach(int i in buffer1.GetConsumingEnumerable())
{
Console.WriteLine("Consumer saw item " + i);
buffer2.Add(i);
}
buffer2.CompleteAdding();
}
private static void Finalizer(BlockingCollection<int> buffer)
{
foreach (int i in buffer.GetConsumingEnumerable())
{
// Do some work
Console.WriteLine("Finalizer saw item " + i);
System.Threading.Thread.Sleep(1000);
}
buffer.CompleteAdding();
}
如何让终结器在 buffer2 中有 5 个项目之前停止消耗它中的项目?。消费者也一样,只有当缓冲区 1 至少有 10 个项目时,它才应该开始消费。
也许你应该看看 TPL Dataflow from Microsoft
我已对您的代码进行了一些更改以适应 TPL
static void Main(string[] args)
{
List<int> input = Enumerable.Range(0, 20).ToList();
BatchBlock<int> buffer1 = new BatchBlock<int>(10);
BatchBlock<int> buffer2 = new BatchBlock<int>(5);
ActionBlock<int[]> action1;
ActionBlock<int[]> action2;
action1 = new ActionBlock<int[]>(t => { Consumer(t, buffer2); },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer1.LinkTo(action1, new DataflowLinkOptions { PropagateCompletion = true });
action2 = new ActionBlock<int[]>(t => Finalizer(t),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer2.LinkTo(action2, new DataflowLinkOptions { PropagateCompletion = true });
Task produceTask = Task.Factory.StartNew(() => Producer(input, buffer1));
Task.WaitAll(produceTask);
action1.Completion.Wait();//Will add all the items to buffer2
buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete
Console.WriteLine("Process complete");
Console.ReadLine();
}
private static void Finalizer(int[] t)
{
Console.WriteLine("Received a batch of items {0}", t.Count());
foreach (int i in t)
{
// Do some work
Console.WriteLine("Finalizer saw item " + i);
System.Threading.Thread.Sleep(1000);
}
}
private static void Consumer(int[] t, BatchBlock<int> buffer2)
{
foreach (var item in t)
{
Console.WriteLine("Consumer saw item " + item);
buffer2.Post(item);
}
}
public static void Producer(List<int> input, BatchBlock<int> buffer1)
{
foreach (int i in input)
{
buffer1.Post(i);
}
//Once marked complete your entire data flow will signal a stop for
// all new items
buffer1.Complete();
}
Here是我用的nuget包
编辑 更新了上面的代码以在消费时取最小计数 10。至于你怀疑当我们有偏斜分布时它是否符合要求那么答案是 YES
我知道这已得到解答,但只是想通过实现自己的 GetConsumingEnumerable 版本来展示另一种方法。您通过将集合作为参数传递来调用此方法而不是 BlockingCollection 方法:
private IEnumerable<T> GetConsumingEnumerable<T>(BlockingCollection<T> sourceCollection)
{
var buffer = new List<T>();
foreach (var item in sourceCollection.GetConsumingEnumerable())
{
buffer.Add(item);
if (buffer.Count >= sourceCollection.BoundedCapacity)
{
foreach (var bItem in buffer)
{
yield return bItem;
}
buffer.Clear();
}
}
foreach (var bItem in buffer)
{
yield return bItem;
}
}