为什么针对同一个 TaskCompletionSource 多次调用时 Task.WhenAny 这么慢?

Why is Task.WhenAny so slow when called many times against the same TaskCompletionSource?

如果 class 有一个生命周期较长的成员 TaskCompletionSource<TResult> m_tcs,并且如果 Task.WhenAny 以 m_tcs.Task 作为其参数之一被调用,性能似乎会下降当调用次数超过 50,000 次左右时呈指数增长。

为什么在这种情况下这么慢?是否有运行速度更快但不使用 4 倍更多内存的替代方案?

我的想法是 Task.WhenAny 可能会在 m_tcs.Task 中添加和删除如此多的延续,并在其中的某个地方导致 O(N²) 的复杂性。

我通过将 TCS 包装在一个等待 m_tcs.Task 的异步函数中找到了一个性能更高的替代方案。它使用大约 4 倍的内存,但 运行s 超过 20,000 次迭代。

下面的示例代码(为了获得准确的结果,请在不附加调试器的情况下直接编译和 运行.exe)。请注意,WhenAnyMemberTcsDirect 存在性能问题,WhenAnyMemberTcsIndirect 是更快的替代方案,WhenAnyLocalTcs 是比较基准:

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

public class WithTcs
{
    // long-lived TaskCompletionSource
    private readonly TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();

    // this has performance issues for large N - O(N^2)
    public async Task WhenAnyMemberTcsDirectAsync(Task task)
    {
        await await Task.WhenAny(task, m_tcs.Task).ConfigureAwait(false);
    }

    // performs faster - O(N), but uses 4x memory
    public async Task WhenAnyMemberTcsIndirectAsync(Task task)
    {
        await await Task.WhenAny(task, AwaitTcsTaskAsync(m_tcs)).ConfigureAwait(false);
    }

    private async Task<TResult> AwaitTcsTaskAsync<TResult>(TaskCompletionSource<TResult> tcs)
    {
        return await tcs.Task.ConfigureAwait(false);
    }

    // baseline for comparison using short-lived TCS
    public async Task WhenAnyLocalTcsAsync(Task task)
    {
        var tcs = new TaskCompletionSource<bool>();
        await await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);
    }
}

class Program
{
    static void Main(string[] args)
    {
        show_warning_if_debugger_attached();

        MainAsync().GetAwaiter().GetResult();

        show_warning_if_debugger_attached();
        Console.ReadLine();
    }

    static async Task MainAsync()
    {
        const int n = 100000;

        Console.WriteLine("Running Task.WhenAny tests ({0:#,0} iterations)", n);
        Console.WriteLine();

        await WhenAnyLocalTcs(n).ConfigureAwait(false);

        await Task.Delay(1000).ConfigureAwait(false);

        await WhenAnyMemberTcsIndirect(n).ConfigureAwait(false);

        await Task.Delay(1000).ConfigureAwait(false);

        await WhenAnyMemberTcsDirect(n).ConfigureAwait(false);
    }

    static Task WhenAnyLocalTcs(int n)
    {
        Func<WithTcs, Task, Task> function =
            (instance, task) => instance.WhenAnyLocalTcsAsync(task);

        return RunTestAsync(n, function);
    }

    static Task WhenAnyMemberTcsIndirect(int n)
    {
        Func<WithTcs, Task, Task> function =
            (instance, task) => instance.WhenAnyMemberTcsIndirectAsync(task);

        return RunTestAsync(n, function);
    }

    static Task WhenAnyMemberTcsDirect(int n)
    {
        Func<WithTcs, Task, Task> function =
            (instance, task) => instance.WhenAnyMemberTcsDirectAsync(task);

        return RunTestAsync(n, function);
    }

    static async Task RunTestAsync(int n, Func<WithTcs, Task, Task> function, [CallerMemberName] string name = "")
    {
        Console.WriteLine(name);

        var tasks = new Task[n];
        var sw = new Stopwatch();
        var startBytes = GC.GetTotalMemory(true);
        sw.Start();

        var instance = new WithTcs();
        var step = n / 78;
        for (int i = 0; i < n; i++)
        {
            var iTemp = i;
            Task primaryTask = Task.Run(() => { if (iTemp % step == 0) Console.Write("."); });
            tasks[i] = function(instance, primaryTask);
        }

        await Task.WhenAll(tasks).ConfigureAwait(false);
        Console.WriteLine();

        var endBytes = GC.GetTotalMemory(true);
        sw.Stop();
        GC.KeepAlive(instance);
        GC.KeepAlive(tasks);

        Console.WriteLine("  Time: {0,7:#,0} ms, Memory: {1,10:#,0} bytes", sw.ElapsedMilliseconds, endBytes - startBytes);
        Console.WriteLine();
    }

    static void show_warning_if_debugger_attached()
    {
        if (Debugger.IsAttached)
            Console.WriteLine("WARNING: running with the debugger attached may result in inaccurate results\r\n".ToUpper());
    }
}

示例结果:

Iterations | WhenAny* Method   | Time (ms) | Memory (bytes)
---------: | ----------------- | --------: | -------------:
     1,000 | LocalTcs          |        21 |         58,248
     1,000 | MemberTcsIndirect |        54 |        217,268
     1,000 | MemberTcsDirect   |        21 |         52,496
    10,000 | LocalTcs          |        91 |        545,836
    10,000 | MemberTcsIndirect |        98 |      2,141,836
    10,000 | MemberTcsDirect   |       140 |        545,640
   100,000 | LocalTcs          |       210 |      4,898,512
   100,000 | MemberTcsIndirect |       502 |     21,426,316
   100,000 | MemberTcsDirect   |    14,090 |      5,085,396
   200,000 | LocalTcs          |       366 |      9,630,872
   200,000 | MemberTcsIndirect |       659 |     41,450,916
   200,000 | MemberTcsDirect   |    42,599 |     10,069,248
   500,000 | LocalTcs          |       808 |     23,670,492
   500,000 | MemberTcsIndirect |     1,906 |     97,339,192
   500,000 | MemberTcsDirect   |   288,373 |     24,968,436
 1,000,000 | LocalTcs          |     1,642 |     47,272,744
 1,000,000 | MemberTcsIndirect |     3,149 |    200,480,888
 1,000,000 | MemberTcsDirect   | 1,268,030 |     48,064,772

注意:针对 .NET 4.6.2 版本(任何 CPU),在 Windows 7 SP1 64 位,Intel Core i7-4770 上测试。

我找到了一个解决方案 运行 既快(O(N) 时间)又大约。相同的内存 space,通过在 TaskCompletionSource 旁边使用成员 CancellationTokenSource m_cts。之前任何对 m_tcs canceled/faulted/result 的调用都需要伴随 m_cts.Cancel()。这当然可以抽象出来。

解决方法:

public class WithTcs
{
    // ... same as above, plus below

    private readonly CancellationTokenSource m_cts = new CancellationTokenSource();

    public async Task WhenAnyMemberCtsAsync(Task task)
    {
        var ct = m_cts.Token;
        var tcs = new TaskCompletionSource<bool>();
        using (ct.Register(() => tcs.TrySetFrom(m_tcs)))
            await await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);
    }
}

public static class TcsExtensions
{
    public static bool TrySetFrom<TResult>(this TaskCompletionSource<TResult> dest, TaskCompletionSource<TResult> source)
    {
        switch (source.Task.Status)
        {
            case TaskStatus.Canceled:
                return dest.TrySetCanceled();
            case TaskStatus.Faulted:
                return dest.TrySetException(source.Task.Exception.InnerExceptions);
            case TaskStatus.RanToCompletion:
                return dest.TrySetResult(source.Task.Result);
            default:
                return false; // TCS has not yet completed
        }
    }
}

这回答了是否存在内存效率高的快速替代方案的问题。我仍然很好奇 WhenAnyMemberTcsDirect 的幕后发生了什么导致 O(N²) 问题。