C TPL 数据流 BatchBlock 从不等待完成 returns
C TPL Dataflow BatchBlock await Completion never returns
我创建了一个小测试程序来说明我的问题
class Item
{
public string Name;
}
class Program
{
static BatchBlock<Item> items = new BatchBlock<Item>(2);
static ConcurrentBag<Item> bag = new ConcurrentBag<Item>();
public static async Task Main(string[] args)
{
var insertItems = new ActionBlock<Item[]>(b => BatchProcessor(b));
items.LinkTo(insertItems);
var finished = items.Completion.ContinueWith(delegate { insertItems.Complete(); }).ConfigureAwait(false);
Console.WriteLine("From main 1");
items.Post(new Item() { Name = "1" });
Console.WriteLine("From main 2");
items.Post(new Item() { Name = "2" });
Console.WriteLine("From main 3");
items.Post(new Item() { Name = "3" });
Console.WriteLine("From main 4");
items.Post(new Item() { Name = "4" });
Console.WriteLine("From main 5");
items.Post(new Item() { Name = "5" });
Console.WriteLine("From main 6");
items.Post(new Item() { Name = "6" });
Console.WriteLine("From main 7");
items.Post(new Item() { Name = "7" });
Console.WriteLine("Finishing");
items.TriggerBatch();
await items.Completion; // Never completes!!!
}
static void BatchProcessor(Item[] items)
{
Console.WriteLine("Callback");
foreach(var i in items)
{
Console.WriteLine("From Callback " + i.Name);
bag.Add(i);
}
}
}
控制台输出符合预期:
From main 1
From main 2
From main 3
From main 4
From main 5
From main 6
From main 7
Finishing
Callback
From Callback 1
From Callback 2
Callback
From Callback 3
From Callback 4
Callback
From Callback 5
From Callback 6
Callback
From Callback 7
我错过了什么?
当您完成向块发送数据后,您需要调用 Complete
。
这个:
await items.Completion;
为了等待整个流程的完成,真的应该是:
items.Complete()
await insertItems.Completion;
最后,您可以 link 阻止传播完成,因此在这种情况下 ContinueWith
不是必需的。
这个:
items.Completion.ContinueWith(delegate { insertItems.Complete(); }).ConfigureAwait(false);
可以替换为:
items.LinkTo(insertItems, new DataflowLinkOptions() { PropagateCompletion = true });
我创建了一个小测试程序来说明我的问题
class Item
{
public string Name;
}
class Program
{
static BatchBlock<Item> items = new BatchBlock<Item>(2);
static ConcurrentBag<Item> bag = new ConcurrentBag<Item>();
public static async Task Main(string[] args)
{
var insertItems = new ActionBlock<Item[]>(b => BatchProcessor(b));
items.LinkTo(insertItems);
var finished = items.Completion.ContinueWith(delegate { insertItems.Complete(); }).ConfigureAwait(false);
Console.WriteLine("From main 1");
items.Post(new Item() { Name = "1" });
Console.WriteLine("From main 2");
items.Post(new Item() { Name = "2" });
Console.WriteLine("From main 3");
items.Post(new Item() { Name = "3" });
Console.WriteLine("From main 4");
items.Post(new Item() { Name = "4" });
Console.WriteLine("From main 5");
items.Post(new Item() { Name = "5" });
Console.WriteLine("From main 6");
items.Post(new Item() { Name = "6" });
Console.WriteLine("From main 7");
items.Post(new Item() { Name = "7" });
Console.WriteLine("Finishing");
items.TriggerBatch();
await items.Completion; // Never completes!!!
}
static void BatchProcessor(Item[] items)
{
Console.WriteLine("Callback");
foreach(var i in items)
{
Console.WriteLine("From Callback " + i.Name);
bag.Add(i);
}
}
}
控制台输出符合预期:
From main 1
From main 2
From main 3
From main 4
From main 5
From main 6
From main 7
Finishing
Callback
From Callback 1
From Callback 2
Callback
From Callback 3
From Callback 4
Callback
From Callback 5
From Callback 6
Callback
From Callback 7
我错过了什么?
当您完成向块发送数据后,您需要调用 Complete
。
这个:
await items.Completion;
为了等待整个流程的完成,真的应该是:
items.Complete()
await insertItems.Completion;
最后,您可以 link 阻止传播完成,因此在这种情况下 ContinueWith
不是必需的。
这个:
items.Completion.ContinueWith(delegate { insertItems.Complete(); }).ConfigureAwait(false);
可以替换为:
items.LinkTo(insertItems, new DataflowLinkOptions() { PropagateCompletion = true });