System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start 有大量的自我时间

System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start has significant self time

我是 profiling 我们的 C# .NET 应用程序,我注意到方法 System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start 出现了多次,占用了大约 3-4 秒的 Self Time我的 1 分钟示例(意味着它在任务基础结构中花费了大约 3-4 秒)。

我了解到编译器使用此方法在 C# 中实现 async/await 语言构造。通常,其中有什么会导致它阻塞或以其他方式占用大量时间?有没有什么方法可以改进我们的方法,使其在这个基础设施上花费的时间更少?

编辑:这里有一个有点冗长但仍然独立的代码示例演示了这个问题,本质上是在两个非常大的数组上进行并行合并排序:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace AsyncAwaitSelfTimeTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Random random = new Random();

            int[] arrayOne = GenerateArray(50_000_000, random.Next);
            double[] arrayTwo = GenerateArray(50_000_000, random.NextDouble);

            Comparer<int> comparerOne = Comparer<int>.Create((a, b) =>
            {
                if (a < b) return -1;
                else if (a > b) return 1;
                else return 0;
            });
            Comparer<double> comparerTwo = Comparer<double>.Create((a, b) =>
            {
                if (a < b) return -1;
                else if (a > b) return 1;
                else return 0;
            });

            var sortTaskOne = Task.Run(() => MergeSort(arrayOne, 0, arrayOne.Length, comparerOne));
            var sortTaskTwo = Task.Run(() => MergeSort(arrayTwo, 0, arrayTwo.Length, comparerTwo));

            Task.WaitAll(sortTaskOne, sortTaskTwo);
            Console.Write("done sorting");
        }

        static T[] GenerateArray<T>(int length, Func<T> getFunc)
        {
            T[] result = new T[length];
            for (int i = 0; i < length; i++)
            {
                result[i] = getFunc();
            }
            return result;
        }

        static async Task MergeSort<T>(T[] array, int start, int end, Comparer<T> comparer)
        {
            if (end - start <= 16)
            {
                SelectionSort(array, start, end, comparer);
            }
            else
            {
                int mid = start + (end - start) / 2;

                Task firstTask = Task.Run(() => MergeSort(array, start, mid, comparer));
                Task secondTask = Task.Run(() => MergeSort(array, mid, end, comparer));

                await Task.WhenAll(firstTask, secondTask);

                int firstIndex = start;
                int secondIndex = mid;
                T[] dest = new T[end - start];
                for (int i = 0; i < dest.Length; i++)
                {
                    if (firstIndex >= mid)
                    {
                        dest[i] = array[secondIndex++];
                    }
                    else if (secondIndex >= end)
                    {
                        dest[i] = array[firstIndex++];
                    }
                    else if (comparer.Compare(array[firstIndex], array[secondIndex]) < 0)
                    {
                        dest[i] = array[firstIndex++];
                    }
                    else
                    {
                        dest[i] = array[secondIndex++];
                    }
                }

                dest.CopyTo(array, start);
            }
        }

        static void SelectionSort<T>(T[] array, int start, int end, Comparer<T> comparer)
        {
            // note: using selection sort here to prevent time variability
            for (int i = start; i < end; i++)
            {
                int minIndex = i;
                for (int j = i + 1; j < end; j++)
                {
                    if (comparer.Compare(array[j], array[minIndex]) < 0)
                    {
                        minIndex = j;
                    }
                }
                T temp = array[i];
                array[i] = array[minIndex];
                array[minIndex] = temp;
            }
        }
    }
}

在这段代码的性能配置文件中,System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start的两个副本(每个通用值类型一个)占用了大部分自处理器时间,只有两个MergeSort方法占用自我处理器时间的一小部分。当未使用 Task.Run 时(因此仅使用单个处理器)也注意到类似的行为。

编辑 2: 非常感谢您到目前为止的回答。我最初认为 Task<TResult> 被使用的事实是问题的一部分(因为它在原始代码中被使用),因此我的结构是复制数组而不是就地排序。但是,我现在认识到那是无关紧要的,所以我更改了上面的代码片段,改为在适当的位置进行合并排序。我还通过引入一个非平凡的顺序截止(为了严格限制时间而进行选择排序)以及使用 Comparer 对象来防止数组元素的装箱分配(从而减少垃圾收集器造成的分析干扰)。

但是,相同的模式,即 AsyncTaskMethodBuilder::Start 占用大量自我时间的模式仍然存在,并且仍然可以在分析结果中找到。

编辑 3: 澄清一下,我 am/was 寻找的答案不是“为什么这段代码很慢?”,而是“为什么 .NET 分析器告诉我大部分成本都花在了我无法控制的方法中?”接受的答案帮助我确定了问题,即大部分逻辑都在探查器不包含的生成类型中。

有意思。我拿了你的样本并将 MergeSort 异步方法更改为非异步方法。现在一个分析会话需要大约 33 秒才能完成(异步版本需要大约 36 秒,两者都使用 Release 配置)。非异步版本如下所示:

    static Task<T[]> MergeSort<T>(T[] src) where T : IComparable<T>
    {
        if (src.Length <= 1)
        {
            return Task.FromResult(src);
        }
        else
        {
            int mid = src.Length / 2;
            T[] firstHalf = new T[mid];
            T[] secondHalf = new T[src.Length - mid];
            Array.Copy(src, 0, firstHalf, 0, mid);
            Array.Copy(src, mid, secondHalf, 0, src.Length - mid);

            Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
            Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));

            return Task.WhenAll(firstTask, secondTask).ContinueWith(
                continuationFunction: _ =>
                {
                    T[] firstDest = firstTask.Result;
                    T[] secondDest = secondTask.Result;
                    int firstIndex = 0;
                    int secondIndex = 0;

                    T[] dest = new T[src.Length];
                    for (int i = 0; i < dest.Length; i++)
                    {
                        if (firstIndex >= firstDest.Length)
                        {
                            dest[i] = secondDest[secondIndex++];
                        }
                        else if (secondIndex >= secondDest.Length)
                        {
                            dest[i] = firstDest[firstIndex++];
                        }
                        else if (firstDest[firstIndex].CompareTo(secondDest[secondIndex]) < 0)
                        {
                            dest[i] = firstDest[firstIndex++];
                        }
                        else
                        {
                            dest[i] = secondDest[secondIndex++];
                        }
                    }

                    return dest;
                },
                cancellationToken: System.Threading.CancellationToken.None,
                continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
                scheduler: TaskScheduler.Default);
        }
    }

因此,对于这个特定示例,async/await 开销似乎约为 3 秒。这超出了我的预期,但肯定不是这里的瓶颈。

关于这个观察:

In the performance profile of this code, the two copies of System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start (one for each generic value type) take up most of the self-processor time, with the two MergeSort methods only taking up a very small fraction of the self-processor time.

我没有分析编译器为这个特定的异步方法生成的代码,但我怀疑 MergeSort 只包含一个简短的 prolog/epilog 代码,而实际的 CPU 密集代码由 AsyncTaskMethodBuilder::Start.

间接调用

What, generally, is in it that would cause it to block or otherwise take up a lot of time?

Every asynchronous method starts execution synchronously。在异步状态机方面,您可以将您的方法视为 "split" 分成多个部分,每个 await 是一个分割点。

所以,我希望 Start 会执行 MergeSort 直到第一个 await - 即创建两个数组并将整个输入数组复制到其中。

您遇到的问题是您生成的任务太多,使正常任务池过载,导致 .NET 生成额外的任务。由于您一直在创建新任务,直到数组的长度为 1AsyncTaskMethodBuilder::Start 开始成为重要的时间消耗者,一旦它需要创建新任务以继续执行并且不能重用池中的任务。

您需要更改几处才能使您的函数具有一定的性能:

第一:清理你await

Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));

await Task.WhenAll(firstTask, secondTask);

T[] firstDest = await firstTask;
T[] secondDest = await secondTask;

这已经是个问题了。请记住,每个 await 都很重要。如果Task已经完成的事件,await此时仍然拆分函数,释放当前Task并在新的Task中继续剩余的函数。这种转变需要时间。不多,但这种情况在您的职能中经常发生,并且可以衡量。

Task.WhenAll 已经 returns 您需要的结果值。

Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));

T[][] dests = await Task.WhenAll(firstTask, secondTask);

T[] firstDest = dests[0];
T[] secondDest = dests[1];

这样可以减少函数中的任务切换次数。

其次:减少创建的Task个实例的数量。

任务是在不同 CPU 核心上分配工作的好工具,但您必须确保它们很忙。创建一个新的 Task 是有难度的,您必须确保它是值得的。

我建议在创建新 Task 的点上添加一个阈值。如果您正在处理的部分变得太小,您不应该创建新的 Task 实例,而是直接调用函数。

例如:

T[] firstDest;
T[] secondDest;
if (mid > 100) 
{
  Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
  Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));

  T[][] dests = await Task.WhenAll(firstTask, secondTask);

  firstDest = dests[0];
  secondDest = dests[1];
} 
else 
{
  firstDest = MergeSort(firstHalf);
  secondDest = MergeSort(secondHalf);
}

您应该尝试不同的值,看看这有何改变。 100 只是我开始时使用的一个值,但您可以选择任何其他值。这将大大减少没有多少工作要做的任务。基本上,该值决定了要处理的一项任务可以接受多少剩余工作。

最后,您应该考虑以不同方式处理 array 实例。如果你告诉你的函数开始位置和他们期望工作的数组部分的长度,你应该能够进一步提高性能,因为你不必复制数组数千次。

正如其他人所说,在紧密循环中调用时,使用 async/await 具有可测量的开销。你不应该需要 Tasks 来完成简单的 CPU-绑定工作,比如排序数组。

如果异步方法通常同步完成,可能可以通过在 C# 7 中使用 ValueTask 来提高性能。

您的示例代码的主要问题是 MergeSort 分配了太多的临时垃圾。通过切换到内置排序方法,我看到了 40 倍的加速,例如:

Array.Sort(arrayOne)