任务调度器:在Task.Factory.StartNew中等待时,线程是否返回到池中?
Task scheduler: when awaiting in a Task.Factory.StartNew, is the thread returned to the pool?
我正在实施一个具有并发上限的工作引擎。我正在使用信号量等待并发性低于最大值,然后使用 Task.Factory.StartNew
将异步处理程序包装在 try
/catch
中,并使用 finally
释放信号量。
我意识到这会在线程池上创建线程 - 但我的问题是,当其中一个任务 -运行 线程实际等待(在真正的 IO 调用或等待句柄上)时,线程是否返回到游泳池,正如我所希望的那样?
如果有更好的方法来实现具有有限并发的任务调度程序,其中工作处理程序是异步方法 (returns Task
),我也很想听听。或者,理想情况下,如果有一种方法可以将异步方法排队(同样,它是一个 Task
-返回异步方法),这比将其包装在同步委托中并将其传递给 Task.Factory.StartNew
, 这看起来很完美..?
(这也让我觉得这里有两种并行性:总体上正在处理多少任务,以及在不同线程上并发 运行 有多少延续。具有可配置的可能很酷两者都有选择,虽然不是固定要求..)
编辑:片段:
concurrencySemaphore.Wait(cancelToken);
deferRelease = false;
try
{
var result = GetWorkItem();
if (result == null)
{ // no work, wait for new work or exit signal
signal = WaitHandle.WaitAny(signals);
continue;
}
deferRelease = true;
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
}
finally
{
concurrencySemaphore.Release();
}
}, cancelToken));
}
finally
{
if (!deferRelease)
{
concurrencySemaphore.Release();
}
}
您可以认为该线程已 return 编辑为 ThreadPool
,甚至认为它实际上不是 return。当异步操作开始时,线程简单地选择下一个排队的项目。
我建议您查看 Task.Run
而不是 Task.Factory.StartNew
Task.Run vs Task.Factory.StartNew。
也可以看看 TPL DataFlow。我认为它会符合您的要求。
这里是一个TaskWorker的例子,它不会产生无数的工作线程。
魔术是通过等待 SemaphoreSlim.WaitAsync()
完成的,这是一个 IO 任务(没有线程)。
class TaskWorker
{
private readonly SemaphoreSlim _semaphore;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cancellationToken);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
await _semaphore.WaitAsync(cancellationToken);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
和一个简单的控制台应用程序来测试
class Program
{
static void Main(string[] args)
{
var worker = new TaskWorker(1);
var cts = new CancellationTokenSource();
var token = cts.Token;
var tasks = Enumerable.Range(1, 10)
.Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
.ToArray();
Task.WhenAll(tasks).GetAwaiter().GetResult();
}
static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
{
Console.WriteLine($"Some Started {id}");
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
Console.WriteLine($"Some Finished {id}");
}
}
更新
TaskWorker
实施 IDisposable
class TaskWorker : IDisposable
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly SemaphoreSlim _semaphore;
private readonly int _maxDegreeOfParallelism;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cts.Token);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
await _semaphore.WaitAsync(cts.Token);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
private void ThrowIfDisposed()
{
if (disposedValue)
{
throw new ObjectDisposedException(this.GetType().FullName);
}
}
#region IDisposable Support
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_cts.Cancel();
// consume all semaphore slots
for (int i = 0; i < _maxDegreeOfParallelism; i++)
{
_semaphore.WaitAsync().GetAwaiter().GetResult();
}
_semaphore.Dispose();
_cts.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
#endregion
}
我正在实施一个具有并发上限的工作引擎。我正在使用信号量等待并发性低于最大值,然后使用 Task.Factory.StartNew
将异步处理程序包装在 try
/catch
中,并使用 finally
释放信号量。
我意识到这会在线程池上创建线程 - 但我的问题是,当其中一个任务 -运行 线程实际等待(在真正的 IO 调用或等待句柄上)时,线程是否返回到游泳池,正如我所希望的那样?
如果有更好的方法来实现具有有限并发的任务调度程序,其中工作处理程序是异步方法 (returns Task
),我也很想听听。或者,理想情况下,如果有一种方法可以将异步方法排队(同样,它是一个 Task
-返回异步方法),这比将其包装在同步委托中并将其传递给 Task.Factory.StartNew
, 这看起来很完美..?
(这也让我觉得这里有两种并行性:总体上正在处理多少任务,以及在不同线程上并发 运行 有多少延续。具有可配置的可能很酷两者都有选择,虽然不是固定要求..)
编辑:片段:
concurrencySemaphore.Wait(cancelToken);
deferRelease = false;
try
{
var result = GetWorkItem();
if (result == null)
{ // no work, wait for new work or exit signal
signal = WaitHandle.WaitAny(signals);
continue;
}
deferRelease = true;
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
}
finally
{
concurrencySemaphore.Release();
}
}, cancelToken));
}
finally
{
if (!deferRelease)
{
concurrencySemaphore.Release();
}
}
您可以认为该线程已 return 编辑为 ThreadPool
,甚至认为它实际上不是 return。当异步操作开始时,线程简单地选择下一个排队的项目。
我建议您查看 Task.Run
而不是 Task.Factory.StartNew
Task.Run vs Task.Factory.StartNew。
也可以看看 TPL DataFlow。我认为它会符合您的要求。
这里是一个TaskWorker的例子,它不会产生无数的工作线程。
魔术是通过等待 SemaphoreSlim.WaitAsync()
完成的,这是一个 IO 任务(没有线程)。
class TaskWorker
{
private readonly SemaphoreSlim _semaphore;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cancellationToken);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
await _semaphore.WaitAsync(cancellationToken);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
和一个简单的控制台应用程序来测试
class Program
{
static void Main(string[] args)
{
var worker = new TaskWorker(1);
var cts = new CancellationTokenSource();
var token = cts.Token;
var tasks = Enumerable.Range(1, 10)
.Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
.ToArray();
Task.WhenAll(tasks).GetAwaiter().GetResult();
}
static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
{
Console.WriteLine($"Some Started {id}");
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
Console.WriteLine($"Some Finished {id}");
}
}
更新
TaskWorker
实施 IDisposable
class TaskWorker : IDisposable
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly SemaphoreSlim _semaphore;
private readonly int _maxDegreeOfParallelism;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cts.Token);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
await _semaphore.WaitAsync(cts.Token);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
private void ThrowIfDisposed()
{
if (disposedValue)
{
throw new ObjectDisposedException(this.GetType().FullName);
}
}
#region IDisposable Support
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_cts.Cancel();
// consume all semaphore slots
for (int i = 0; i < _maxDegreeOfParallelism; i++)
{
_semaphore.WaitAsync().GetAwaiter().GetResult();
}
_semaphore.Dispose();
_cts.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
#endregion
}