任务调度器:在Task.Factory.StartNew中等待时,线程是否返回到池中?

Task scheduler: when awaiting in a Task.Factory.StartNew, is the thread returned to the pool?

我正在实施一个具有并发上限的工作引擎。我正在使用信号量等待并发性低于最大值,然后使用 Task.Factory.StartNew 将异步处理程序包装在 try/catch 中,并使用 finally 释放信号量。

我意识到这会在线程池上创建线程 - 但我的问题是,当其中一个任务 -运行 线程实际等待(在真正的 IO 调用或等待句柄上)时,线程是否返回到游泳池,正如我所希望的那样?

如果有更好的方法来实现具有有限并发的任务调度程序,其中工作处理程序是异步方法 (returns Task),我也很想听听。或者,理想情况下,如果有一种方法可以将异步方法排队(同样,它是一个 Task-返回异步方法),这比将其包装在同步委托中并将其传递给 Task.Factory.StartNew, 这看起来很完美..?

(这也让我觉得这里有两种并行性:总体上正在处理多少任务,以及在不同线程上并发 运行 有多少延续。具有可配置的可能很酷两者都有选择,虽然不是固定要求..)

编辑:片段:

                    concurrencySemaphore.Wait(cancelToken);
                    deferRelease = false;
                    try
                    {
                        var result = GetWorkItem();
                        if (result == null)
                        { // no work, wait for new work or exit signal
                            signal = WaitHandle.WaitAny(signals);
                            continue;
                        }

                        deferRelease = true;
                        tasks.Add(Task.Factory.StartNew(() =>
                        {
                            try
                            {
                                DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
                            }
                            finally
                            {
                                concurrencySemaphore.Release();
                            }
                        }, cancelToken));
                    }
                    finally
                    {
                        if (!deferRelease)
                        {
                            concurrencySemaphore.Release();
                        }
                    }

您可以认为该线程已 return 编辑为 ThreadPool,甚至认为它实际上不是 return。当异步操作开始时,线程简单地选择下一个排队的项目。

我建议您查看 Task.Run 而不是 Task.Factory.StartNew Task.Run vs Task.Factory.StartNew

也可以看看 TPL DataFlow。我认为它会符合您的要求。

这里是一个TaskWorker的例子,它不会产生无数的工作线程。

魔术是通过等待 SemaphoreSlim.WaitAsync() 完成的,这是一个 IO 任务(没有线程)。

class TaskWorker
{
    private readonly SemaphoreSlim _semaphore;

    public TaskWorker(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        }

        _semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    }

    public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        // No ConfigureAwait(false) here to keep the SyncContext if any
        // for the real task
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            await taskFactory().ConfigureAwait(false);
        }
        finally
        {
            _semaphore.Release(1);
        }
    }

    public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            return await taskFactory().ConfigureAwait(false);
        }
        finally
        {
            _semaphore.Release(1);
        }
    }
}

和一个简单的控制台应用程序来测试

class Program
{
    static void Main(string[] args)
    {
        var worker = new TaskWorker(1);
        var cts = new CancellationTokenSource();
        var token = cts.Token;

        var tasks = Enumerable.Range(1, 10)
            .Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
            .ToArray();

        Task.WhenAll(tasks).GetAwaiter().GetResult();
    }

    static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Some Started {id}");
        await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
        Console.WriteLine($"Some Finished {id}");
    }
}

更新

TaskWorker 实施 IDisposable

class TaskWorker : IDisposable
{
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();
    private readonly SemaphoreSlim _semaphore;
    private readonly int _maxDegreeOfParallelism;

    public TaskWorker(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        }

        _maxDegreeOfParallelism = maxDegreeOfParallelism;
        _semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    }

    public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        ThrowIfDisposed();

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
        {
            // No ConfigureAwait(false) here to keep the SyncContext if any
            // for the real task
            await _semaphore.WaitAsync(cts.Token);
            try
            {
                await taskFactory().ConfigureAwait(false);
            }
            finally
            {
                _semaphore.Release(1);
            }
        }
    }

    public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        ThrowIfDisposed();

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
        {
            await _semaphore.WaitAsync(cts.Token);
            try
            {
                return await taskFactory().ConfigureAwait(false);
            }
            finally
            {
                _semaphore.Release(1);
            }
        }
    }

    private void ThrowIfDisposed()
    {
        if (disposedValue)
        {
            throw new ObjectDisposedException(this.GetType().FullName);
        }
    }

    #region IDisposable Support
    private bool disposedValue = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                _cts.Cancel();
                // consume all semaphore slots
                for (int i = 0; i < _maxDegreeOfParallelism; i++)
                {
                    _semaphore.WaitAsync().GetAwaiter().GetResult();
                }
                _semaphore.Dispose();
                _cts.Dispose();
            }
            disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
    }
    #endregion
}