TPL 数据流:为什么 EnsureOrdered = false 会破坏此 TransformManyBlock 的并行性?

TPL Dataflow: Why does EnsureOrdered = false destroy parallelism for this TransformManyBlock?

我正在处理 TPL 数据流管道并注意到 TransformManyBlock 中与 ordering/parallelism 相关的一些奇怪行为(也可能适用于其他块)。

这是我要重现的代码(.NET 4.7.2,TPL Dataflow 4.9.0):

class Program
{
    static void Main(string[] args)
    {
        var sourceBlock = new TransformManyBlock<int, Tuple<int, int>>(i => Source(i),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false });

        var targetBlock = new ActionBlock<Tuple<int, int>>(tpl =>
        {
            Console.WriteLine($"Received ({tpl.Item1}, {tpl.Item2})");
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = true });

        sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

        for (int i = 0; i < 10; i++)
        {
            sourceBlock.Post(i);
        }

        sourceBlock.Complete();
        targetBlock.Completion.Wait();
        Console.WriteLine("Finished");
        Console.Read();
    }

    static IEnumerable<Tuple<int, int>> Source(int i)
    {
        var rand = new Random(543543254);
        for (int j = 0; j < i; j++)
        {
            Thread.Sleep(rand.Next(100, 1500));
            Console.WriteLine($"Returning ({i}, {j})");
            yield return Tuple.Create(i, j);
        }
    }
}

我想要的行为如下:

据我了解,yield return的性质满足次序条件,所以EnsureOrdered可以设置为false。如果将其设置为 true,源块将在不可接受的时间内保留消息,因为它将等待所有 yield return 完成,然后再传递消息(在实际应用程序中有许多 GB 的数据已处理,这意味着我们希望尽快通过管道传播数据,以便释放 RAM)。这是当源块的 EnsureOrdered 设置为 true:

时的示例输出
Returning (1, 0)
Returning (2, 0)
Returning (4, 0)
Returning (3, 0)
Returning (2, 1)
Returning (4, 1)
Returning (3, 1)
Received (1, 0)
Received (2, 0)
Received (2, 1)
Returning (4, 2)
Returning (3, 2)
Received (3, 0)
Received (3, 1)
Received (3, 2)
Returning (5, 0)
Returning (6, 0)

我们可以看到源块并行工作,但会等待传播消息,直到生成了下一行 i 的所有消息(如预期)。

然而,当源块的 EnsureOrderedfalse(如代码示例中所示)时,我得到以下输出:

Returning (2, 0)
Received (2, 0)
Returning (2, 1)
Received (2, 1)
Returning (4, 0)
Received (4, 0)
Returning (4, 1)
Received (4, 1)
Returning (4, 2)
Received (4, 2)
Returning (4, 3)
Received (4, 3)
Returning (1, 0)
Received (1, 0)
Returning (3, 0)
Received (3, 0)
Returning (3, 1)
Received (3, 1)
Returning (3, 2)
Received (3, 2)

源块在可用时成功传播消息,但似乎失去了并行性,因为它一次只处理一个 i

这是为什么?我怎样才能强制它并行处理?

此处正在修复此问题:https://github.com/dotnet/corefx/pull/31059

感谢您的举报!