System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start 有大量的自我时间
System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start has significant self time
我是 profiling 我们的 C# .NET 应用程序,我注意到方法 System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start
出现了多次,占用了大约 3-4 秒的 Self Time我的 1 分钟示例(意味着它在任务基础结构中花费了大约 3-4 秒)。
我了解到编译器使用此方法在 C# 中实现 async
/await
语言构造。通常,其中有什么会导致它阻塞或以其他方式占用大量时间?有没有什么方法可以改进我们的方法,使其在这个基础设施上花费的时间更少?
编辑:这里有一个有点冗长但仍然独立的代码示例演示了这个问题,本质上是在两个非常大的数组上进行并行合并排序:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace AsyncAwaitSelfTimeTest
{
class Program
{
static void Main(string[] args)
{
Random random = new Random();
int[] arrayOne = GenerateArray(50_000_000, random.Next);
double[] arrayTwo = GenerateArray(50_000_000, random.NextDouble);
Comparer<int> comparerOne = Comparer<int>.Create((a, b) =>
{
if (a < b) return -1;
else if (a > b) return 1;
else return 0;
});
Comparer<double> comparerTwo = Comparer<double>.Create((a, b) =>
{
if (a < b) return -1;
else if (a > b) return 1;
else return 0;
});
var sortTaskOne = Task.Run(() => MergeSort(arrayOne, 0, arrayOne.Length, comparerOne));
var sortTaskTwo = Task.Run(() => MergeSort(arrayTwo, 0, arrayTwo.Length, comparerTwo));
Task.WaitAll(sortTaskOne, sortTaskTwo);
Console.Write("done sorting");
}
static T[] GenerateArray<T>(int length, Func<T> getFunc)
{
T[] result = new T[length];
for (int i = 0; i < length; i++)
{
result[i] = getFunc();
}
return result;
}
static async Task MergeSort<T>(T[] array, int start, int end, Comparer<T> comparer)
{
if (end - start <= 16)
{
SelectionSort(array, start, end, comparer);
}
else
{
int mid = start + (end - start) / 2;
Task firstTask = Task.Run(() => MergeSort(array, start, mid, comparer));
Task secondTask = Task.Run(() => MergeSort(array, mid, end, comparer));
await Task.WhenAll(firstTask, secondTask);
int firstIndex = start;
int secondIndex = mid;
T[] dest = new T[end - start];
for (int i = 0; i < dest.Length; i++)
{
if (firstIndex >= mid)
{
dest[i] = array[secondIndex++];
}
else if (secondIndex >= end)
{
dest[i] = array[firstIndex++];
}
else if (comparer.Compare(array[firstIndex], array[secondIndex]) < 0)
{
dest[i] = array[firstIndex++];
}
else
{
dest[i] = array[secondIndex++];
}
}
dest.CopyTo(array, start);
}
}
static void SelectionSort<T>(T[] array, int start, int end, Comparer<T> comparer)
{
// note: using selection sort here to prevent time variability
for (int i = start; i < end; i++)
{
int minIndex = i;
for (int j = i + 1; j < end; j++)
{
if (comparer.Compare(array[j], array[minIndex]) < 0)
{
minIndex = j;
}
}
T temp = array[i];
array[i] = array[minIndex];
array[minIndex] = temp;
}
}
}
}
在这段代码的性能配置文件中,System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start
的两个副本(每个通用值类型一个)占用了大部分自处理器时间,只有两个MergeSort
方法占用自我处理器时间的一小部分。当未使用 Task.Run
时(因此仅使用单个处理器)也注意到类似的行为。
编辑 2: 非常感谢您到目前为止的回答。我最初认为 Task<TResult>
被使用的事实是问题的一部分(因为它在原始代码中被使用),因此我的结构是复制数组而不是就地排序。但是,我现在认识到那是无关紧要的,所以我更改了上面的代码片段,改为在适当的位置进行合并排序。我还通过引入一个非平凡的顺序截止(为了严格限制时间而进行选择排序)以及使用 Comparer
对象来防止数组元素的装箱分配(从而减少垃圾收集器造成的分析干扰)。
但是,相同的模式,即 AsyncTaskMethodBuilder::Start
占用大量自我时间的模式仍然存在,并且仍然可以在分析结果中找到。
编辑 3: 澄清一下,我 am/was 寻找的答案不是“为什么这段代码很慢?”,而是“为什么 .NET 分析器告诉我大部分成本都花在了我无法控制的方法中?”接受的答案帮助我确定了问题,即大部分逻辑都在探查器不包含的生成类型中。
有意思。我拿了你的样本并将 MergeSort
异步方法更改为非异步方法。现在一个分析会话需要大约 33 秒才能完成(异步版本需要大约 36 秒,两者都使用 Release 配置)。非异步版本如下所示:
static Task<T[]> MergeSort<T>(T[] src) where T : IComparable<T>
{
if (src.Length <= 1)
{
return Task.FromResult(src);
}
else
{
int mid = src.Length / 2;
T[] firstHalf = new T[mid];
T[] secondHalf = new T[src.Length - mid];
Array.Copy(src, 0, firstHalf, 0, mid);
Array.Copy(src, mid, secondHalf, 0, src.Length - mid);
Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));
return Task.WhenAll(firstTask, secondTask).ContinueWith(
continuationFunction: _ =>
{
T[] firstDest = firstTask.Result;
T[] secondDest = secondTask.Result;
int firstIndex = 0;
int secondIndex = 0;
T[] dest = new T[src.Length];
for (int i = 0; i < dest.Length; i++)
{
if (firstIndex >= firstDest.Length)
{
dest[i] = secondDest[secondIndex++];
}
else if (secondIndex >= secondDest.Length)
{
dest[i] = firstDest[firstIndex++];
}
else if (firstDest[firstIndex].CompareTo(secondDest[secondIndex]) < 0)
{
dest[i] = firstDest[firstIndex++];
}
else
{
dest[i] = secondDest[secondIndex++];
}
}
return dest;
},
cancellationToken: System.Threading.CancellationToken.None,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default);
}
}
因此,对于这个特定示例,async/await 开销似乎约为 3 秒。这超出了我的预期,但肯定不是这里的瓶颈。
关于这个观察:
In the performance profile of this code, the two copies of
System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start (one for
each generic value type) take up most of the self-processor time, with
the two MergeSort methods only taking up a very small fraction of the
self-processor time.
我没有分析编译器为这个特定的异步方法生成的代码,但我怀疑 MergeSort
只包含一个简短的 prolog/epilog 代码,而实际的 CPU 密集代码由 AsyncTaskMethodBuilder::Start
.
间接调用
What, generally, is in it that would cause it to block or otherwise take up a lot of time?
Every asynchronous method starts execution synchronously。在异步状态机方面,您可以将您的方法视为 "split" 分成多个部分,每个 await
是一个分割点。
所以,我希望 Start
会执行 MergeSort
直到第一个 await
- 即创建两个数组并将整个输入数组复制到其中。
您遇到的问题是您生成的任务太多,使正常任务池过载,导致 .NET 生成额外的任务。由于您一直在创建新任务,直到数组的长度为 1
。 AsyncTaskMethodBuilder::Start
开始成为重要的时间消耗者,一旦它需要创建新任务以继续执行并且不能重用池中的任务。
您需要更改几处才能使您的函数具有一定的性能:
第一:清理你await
的
Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));
await Task.WhenAll(firstTask, secondTask);
T[] firstDest = await firstTask;
T[] secondDest = await secondTask;
这已经是个问题了。请记住,每个 await
都很重要。如果Task
已经完成的事件,await
此时仍然拆分函数,释放当前Task
并在新的Task
中继续剩余的函数。这种转变需要时间。不多,但这种情况在您的职能中经常发生,并且可以衡量。
Task.WhenAll
已经 returns 您需要的结果值。
Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));
T[][] dests = await Task.WhenAll(firstTask, secondTask);
T[] firstDest = dests[0];
T[] secondDest = dests[1];
这样可以减少函数中的任务切换次数。
其次:减少创建的Task
个实例的数量。
任务是在不同 CPU 核心上分配工作的好工具,但您必须确保它们很忙。创建一个新的 Task
是有难度的,您必须确保它是值得的。
我建议在创建新 Task
的点上添加一个阈值。如果您正在处理的部分变得太小,您不应该创建新的 Task
实例,而是直接调用函数。
例如:
T[] firstDest;
T[] secondDest;
if (mid > 100)
{
Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));
T[][] dests = await Task.WhenAll(firstTask, secondTask);
firstDest = dests[0];
secondDest = dests[1];
}
else
{
firstDest = MergeSort(firstHalf);
secondDest = MergeSort(secondHalf);
}
您应该尝试不同的值,看看这有何改变。 100
只是我开始时使用的一个值,但您可以选择任何其他值。这将大大减少没有多少工作要做的任务。基本上,该值决定了要处理的一项任务可以接受多少剩余工作。
最后,您应该考虑以不同方式处理 array
实例。如果你告诉你的函数开始位置和他们期望工作的数组部分的长度,你应该能够进一步提高性能,因为你不必复制数组数千次。
正如其他人所说,在紧密循环中调用时,使用 async/await 具有可测量的开销。你不应该需要 Tasks 来完成简单的 CPU-绑定工作,比如排序数组。
如果异步方法通常同步完成,可能可以通过在 C# 7 中使用 ValueTask 来提高性能。
您的示例代码的主要问题是 MergeSort 分配了太多的临时垃圾。通过切换到内置排序方法,我看到了 40 倍的加速,例如:
Array.Sort(arrayOne)
我是 profiling 我们的 C# .NET 应用程序,我注意到方法 System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start
出现了多次,占用了大约 3-4 秒的 Self Time我的 1 分钟示例(意味着它在任务基础结构中花费了大约 3-4 秒)。
我了解到编译器使用此方法在 C# 中实现 async
/await
语言构造。通常,其中有什么会导致它阻塞或以其他方式占用大量时间?有没有什么方法可以改进我们的方法,使其在这个基础设施上花费的时间更少?
编辑:这里有一个有点冗长但仍然独立的代码示例演示了这个问题,本质上是在两个非常大的数组上进行并行合并排序:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace AsyncAwaitSelfTimeTest
{
class Program
{
static void Main(string[] args)
{
Random random = new Random();
int[] arrayOne = GenerateArray(50_000_000, random.Next);
double[] arrayTwo = GenerateArray(50_000_000, random.NextDouble);
Comparer<int> comparerOne = Comparer<int>.Create((a, b) =>
{
if (a < b) return -1;
else if (a > b) return 1;
else return 0;
});
Comparer<double> comparerTwo = Comparer<double>.Create((a, b) =>
{
if (a < b) return -1;
else if (a > b) return 1;
else return 0;
});
var sortTaskOne = Task.Run(() => MergeSort(arrayOne, 0, arrayOne.Length, comparerOne));
var sortTaskTwo = Task.Run(() => MergeSort(arrayTwo, 0, arrayTwo.Length, comparerTwo));
Task.WaitAll(sortTaskOne, sortTaskTwo);
Console.Write("done sorting");
}
static T[] GenerateArray<T>(int length, Func<T> getFunc)
{
T[] result = new T[length];
for (int i = 0; i < length; i++)
{
result[i] = getFunc();
}
return result;
}
static async Task MergeSort<T>(T[] array, int start, int end, Comparer<T> comparer)
{
if (end - start <= 16)
{
SelectionSort(array, start, end, comparer);
}
else
{
int mid = start + (end - start) / 2;
Task firstTask = Task.Run(() => MergeSort(array, start, mid, comparer));
Task secondTask = Task.Run(() => MergeSort(array, mid, end, comparer));
await Task.WhenAll(firstTask, secondTask);
int firstIndex = start;
int secondIndex = mid;
T[] dest = new T[end - start];
for (int i = 0; i < dest.Length; i++)
{
if (firstIndex >= mid)
{
dest[i] = array[secondIndex++];
}
else if (secondIndex >= end)
{
dest[i] = array[firstIndex++];
}
else if (comparer.Compare(array[firstIndex], array[secondIndex]) < 0)
{
dest[i] = array[firstIndex++];
}
else
{
dest[i] = array[secondIndex++];
}
}
dest.CopyTo(array, start);
}
}
static void SelectionSort<T>(T[] array, int start, int end, Comparer<T> comparer)
{
// note: using selection sort here to prevent time variability
for (int i = start; i < end; i++)
{
int minIndex = i;
for (int j = i + 1; j < end; j++)
{
if (comparer.Compare(array[j], array[minIndex]) < 0)
{
minIndex = j;
}
}
T temp = array[i];
array[i] = array[minIndex];
array[minIndex] = temp;
}
}
}
}
在这段代码的性能配置文件中,System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start
的两个副本(每个通用值类型一个)占用了大部分自处理器时间,只有两个MergeSort
方法占用自我处理器时间的一小部分。当未使用 Task.Run
时(因此仅使用单个处理器)也注意到类似的行为。
编辑 2: 非常感谢您到目前为止的回答。我最初认为 Task<TResult>
被使用的事实是问题的一部分(因为它在原始代码中被使用),因此我的结构是复制数组而不是就地排序。但是,我现在认识到那是无关紧要的,所以我更改了上面的代码片段,改为在适当的位置进行合并排序。我还通过引入一个非平凡的顺序截止(为了严格限制时间而进行选择排序)以及使用 Comparer
对象来防止数组元素的装箱分配(从而减少垃圾收集器造成的分析干扰)。
但是,相同的模式,即 AsyncTaskMethodBuilder::Start
占用大量自我时间的模式仍然存在,并且仍然可以在分析结果中找到。
编辑 3: 澄清一下,我 am/was 寻找的答案不是“为什么这段代码很慢?”,而是“为什么 .NET 分析器告诉我大部分成本都花在了我无法控制的方法中?”接受的答案帮助我确定了问题,即大部分逻辑都在探查器不包含的生成类型中。
有意思。我拿了你的样本并将 MergeSort
异步方法更改为非异步方法。现在一个分析会话需要大约 33 秒才能完成(异步版本需要大约 36 秒,两者都使用 Release 配置)。非异步版本如下所示:
static Task<T[]> MergeSort<T>(T[] src) where T : IComparable<T>
{
if (src.Length <= 1)
{
return Task.FromResult(src);
}
else
{
int mid = src.Length / 2;
T[] firstHalf = new T[mid];
T[] secondHalf = new T[src.Length - mid];
Array.Copy(src, 0, firstHalf, 0, mid);
Array.Copy(src, mid, secondHalf, 0, src.Length - mid);
Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));
return Task.WhenAll(firstTask, secondTask).ContinueWith(
continuationFunction: _ =>
{
T[] firstDest = firstTask.Result;
T[] secondDest = secondTask.Result;
int firstIndex = 0;
int secondIndex = 0;
T[] dest = new T[src.Length];
for (int i = 0; i < dest.Length; i++)
{
if (firstIndex >= firstDest.Length)
{
dest[i] = secondDest[secondIndex++];
}
else if (secondIndex >= secondDest.Length)
{
dest[i] = firstDest[firstIndex++];
}
else if (firstDest[firstIndex].CompareTo(secondDest[secondIndex]) < 0)
{
dest[i] = firstDest[firstIndex++];
}
else
{
dest[i] = secondDest[secondIndex++];
}
}
return dest;
},
cancellationToken: System.Threading.CancellationToken.None,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default);
}
}
因此,对于这个特定示例,async/await 开销似乎约为 3 秒。这超出了我的预期,但肯定不是这里的瓶颈。
关于这个观察:
In the performance profile of this code, the two copies of System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start (one for each generic value type) take up most of the self-processor time, with the two MergeSort methods only taking up a very small fraction of the self-processor time.
我没有分析编译器为这个特定的异步方法生成的代码,但我怀疑 MergeSort
只包含一个简短的 prolog/epilog 代码,而实际的 CPU 密集代码由 AsyncTaskMethodBuilder::Start
.
What, generally, is in it that would cause it to block or otherwise take up a lot of time?
Every asynchronous method starts execution synchronously。在异步状态机方面,您可以将您的方法视为 "split" 分成多个部分,每个 await
是一个分割点。
所以,我希望 Start
会执行 MergeSort
直到第一个 await
- 即创建两个数组并将整个输入数组复制到其中。
您遇到的问题是您生成的任务太多,使正常任务池过载,导致 .NET 生成额外的任务。由于您一直在创建新任务,直到数组的长度为 1
。 AsyncTaskMethodBuilder::Start
开始成为重要的时间消耗者,一旦它需要创建新任务以继续执行并且不能重用池中的任务。
您需要更改几处才能使您的函数具有一定的性能:
第一:清理你await
的
Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));
await Task.WhenAll(firstTask, secondTask);
T[] firstDest = await firstTask;
T[] secondDest = await secondTask;
这已经是个问题了。请记住,每个 await
都很重要。如果Task
已经完成的事件,await
此时仍然拆分函数,释放当前Task
并在新的Task
中继续剩余的函数。这种转变需要时间。不多,但这种情况在您的职能中经常发生,并且可以衡量。
Task.WhenAll
已经 returns 您需要的结果值。
Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));
T[][] dests = await Task.WhenAll(firstTask, secondTask);
T[] firstDest = dests[0];
T[] secondDest = dests[1];
这样可以减少函数中的任务切换次数。
其次:减少创建的Task
个实例的数量。
任务是在不同 CPU 核心上分配工作的好工具,但您必须确保它们很忙。创建一个新的 Task
是有难度的,您必须确保它是值得的。
我建议在创建新 Task
的点上添加一个阈值。如果您正在处理的部分变得太小,您不应该创建新的 Task
实例,而是直接调用函数。
例如:
T[] firstDest;
T[] secondDest;
if (mid > 100)
{
Task<T[]> firstTask = Task.Run(() => MergeSort(firstHalf));
Task<T[]> secondTask = Task.Run(() => MergeSort(secondHalf));
T[][] dests = await Task.WhenAll(firstTask, secondTask);
firstDest = dests[0];
secondDest = dests[1];
}
else
{
firstDest = MergeSort(firstHalf);
secondDest = MergeSort(secondHalf);
}
您应该尝试不同的值,看看这有何改变。 100
只是我开始时使用的一个值,但您可以选择任何其他值。这将大大减少没有多少工作要做的任务。基本上,该值决定了要处理的一项任务可以接受多少剩余工作。
最后,您应该考虑以不同方式处理 array
实例。如果你告诉你的函数开始位置和他们期望工作的数组部分的长度,你应该能够进一步提高性能,因为你不必复制数组数千次。
正如其他人所说,在紧密循环中调用时,使用 async/await 具有可测量的开销。你不应该需要 Tasks 来完成简单的 CPU-绑定工作,比如排序数组。
如果异步方法通常同步完成,可能可以通过在 C# 7 中使用 ValueTask 来提高性能。
您的示例代码的主要问题是 MergeSort 分配了太多的临时垃圾。通过切换到内置排序方法,我看到了 40 倍的加速,例如:
Array.Sort(arrayOne)