SemaphoreSlim 没有限制任务
SemaphoreSlim is not throttling the tasks
我创建了以下方法 TestThrottled 来尝试限制我的任务,但它根本没有限制,当我调用 WhenAll 并且此方法都具有相同的经过时间时。我做错了什么吗?
private static async Task<T[]> TestThrottled<T>(List<Task<T>> tasks, int maxDegreeOfParallelism)
{
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
var tasksParallelized = new List<Task<T>>();
foreach (var task in tasks)
{
var taskParallelized = Task.Run(async () =>
{
try
{
await semaphore.WaitAsync();
return await task;
}
finally
{
semaphore.Release();
}
});
tasksParallelized.Add(taskParallelized);
}
return await Task.WhenAll(tasksParallelized);
}
private static async Task<int> TestAsync()
{
await Task.Delay(1000);
return 1;
}
static async Task Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tasks = new List<Task<int>>();
var ints = new List<int>();
for (int i = 0; i < 30; i++)
{
tasks.Add(TestAsync());
}
ints.AddRange(await TestThrottled(tasks, 1));
Console.WriteLine($"{sw.ElapsedMilliseconds}, count: {ints.Count}");
Console.ReadLine();
}
这里的主要问题是 async/await
的行为。当你打电话时会发生什么
private static async Task<int> TestAsync()
{
await Task.Delay(1000);
return 1;
}
TestAync();
TestAsync()
被调用。在该方法中, Task.Delay()
被调用。这将创建一个在 1000 毫秒后完成的任务。最后,您 return 该任务(实际上,另一个任务被安排为 return 由 Task.Delay()
编辑的任务的延续)。
您在 Main()
的循环中几乎同时创建了所有这些任务。因此,尽管您可能有一个阻止多个线程同时调用 await task
的信号量,但它们都安排在大约同一时间完成。 await
仅在任务尚未完成时等待。因此,一旦第一个线程释放信号量(大约一秒后),下一个线程就可以进入临界区,在那里它会发现任务已经完成(或非常接近完成)。然后它可以立即释放信号量。其余任务也会发生这种情况,您总共 运行 大约一秒的时间。
我解决了我的问题(创建一个接收异步方法列表的通用节流任务运行器),操作如下:
private static async Task<T[]> RunAsyncThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
{
var tasksParallelized = new List<Task<T>>();
using (var semaphore = new SemaphoreSlim(maxDegreeOfParallelism))
{
foreach (var task in tasks)
{
var taskParallelized = Task.Run(async () =>
{
await semaphore.WaitAsync();
try
{
return await task.Invoke();
}
finally
{
semaphore.Release();
}
});
tasksParallelized.Add(taskParallelized);
}
return await Task.WhenAll(tasksParallelized);
}
}
private static async Task<int> TestAsync(int num)
{
await Task.Delay(1000);
return 1 + num;
}
static async Task Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tasks = new List<Func<Task<int>>>();
var ints = new List<int>();
for (int i = 0; i < 10; i++)
{
tasks.Add(() => TestAsync(12000));
}
ints.AddRange(await RunAsyncThrottled(tasks, 1000));
Console.WriteLine($"{sw.Elapsed.TotalMilliseconds}, count: {ints.Count}");
Console.ReadLine();
}
另一种方法是使用 TPL DataFlow,它已经拥有您需要的一切,并且可以根据需要满足更复杂的 流水线,而且更多可配置的。它还可以节省您在示例解决方案中卸载到另一个任务的负担
private static async Task<IList<T>> TestThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
{
var options = new ExecutionDataflowBlockOptions() { EnsureOrdered = false, MaxDegreeOfParallelism = maxDegreeOfParallelism };
var transform = new TransformBlock<Func<Task<T>>, T>(func => func.Invoke(), options);
var outputBufferBlock = new BufferBlock<T>();
transform.LinkTo(outputBufferBlock, new DataflowLinkOptions(){PropagateCompletion = true});
foreach (var task in tasks)
transform.Post(task);
transform.Complete();
await outputBufferBlock. Completion;
outputBufferBlock.TryReceiveAll(out var result);
return result;
}
解决这个问题的关键是让节流器启动任务,而不是预先启动它们。由于使用旧的 Task.Start
方法显式启动任务非常受限(早于并且不能利用 async-await 机制),唯一的选择是让节流器创建任务。有多种方法可以做到这一点:
- 传递任务工厂而不是任务。此方法已在其他答案中演示。
private static async Task<TResult[]> RunAsyncThrottled<TResult>(
IEnumerable<Func<Task<TResult>>> taskFactories,
int maxDegreeOfParallelism)
{
//...
foreach (var taskFactory in taskFactories)
//...
var task = taskFactory();
TResult result = await task;
}
- 传递一系列项目和一个接受项目作为参数的任务工厂。这是最常用的方法:
private static async Task<TResult[]> RunAsyncThrottled<TSource, TResult>(
IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
int maxDegreeOfParallelism)
{
//...
foreach (var item in items)
//...
var task = taskFactory(item);
TResult result = await task;
}
- 传递延迟的可枚举任务。可以使用 LINQ 或迭代器(
yield
的方法)创建这样的枚举。
private static async Task<TResult[]> RunAsyncThrottled<TResult>(
IEnumerable<Task<TResult>> tasks, int maxDegreeOfParallelism)
{
if (tasks is ICollection<Task<TResult>>) throw new ArgumentException(
"The enumerable should not be materialized.", nameof(tasks));
//...
foreach (var task in tasks)
//...
TResult result = await task;
}
由于C# 8现已发布,因此在方法的return值上有一个替代方法。而不是 returning Task<TResult[]>
它可以 return
IAsyncEnumerable<TResult>
,允许使用 await foreach
.
进行异步枚举
private static async IAsyncEnumerable<TResult> RunAsyncThrottled<TSource, TResult>(
IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
int maxDegreeOfParallelism)
{
//...
foreach (var item in items)
//...
yield return await taskFactory(item);
}
我创建了以下方法 TestThrottled 来尝试限制我的任务,但它根本没有限制,当我调用 WhenAll 并且此方法都具有相同的经过时间时。我做错了什么吗?
private static async Task<T[]> TestThrottled<T>(List<Task<T>> tasks, int maxDegreeOfParallelism)
{
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
var tasksParallelized = new List<Task<T>>();
foreach (var task in tasks)
{
var taskParallelized = Task.Run(async () =>
{
try
{
await semaphore.WaitAsync();
return await task;
}
finally
{
semaphore.Release();
}
});
tasksParallelized.Add(taskParallelized);
}
return await Task.WhenAll(tasksParallelized);
}
private static async Task<int> TestAsync()
{
await Task.Delay(1000);
return 1;
}
static async Task Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tasks = new List<Task<int>>();
var ints = new List<int>();
for (int i = 0; i < 30; i++)
{
tasks.Add(TestAsync());
}
ints.AddRange(await TestThrottled(tasks, 1));
Console.WriteLine($"{sw.ElapsedMilliseconds}, count: {ints.Count}");
Console.ReadLine();
}
这里的主要问题是 async/await
的行为。当你打电话时会发生什么
private static async Task<int> TestAsync()
{
await Task.Delay(1000);
return 1;
}
TestAync();
TestAsync()
被调用。在该方法中, Task.Delay()
被调用。这将创建一个在 1000 毫秒后完成的任务。最后,您 return 该任务(实际上,另一个任务被安排为 return 由 Task.Delay()
编辑的任务的延续)。
您在 Main()
的循环中几乎同时创建了所有这些任务。因此,尽管您可能有一个阻止多个线程同时调用 await task
的信号量,但它们都安排在大约同一时间完成。 await
仅在任务尚未完成时等待。因此,一旦第一个线程释放信号量(大约一秒后),下一个线程就可以进入临界区,在那里它会发现任务已经完成(或非常接近完成)。然后它可以立即释放信号量。其余任务也会发生这种情况,您总共 运行 大约一秒的时间。
我解决了我的问题(创建一个接收异步方法列表的通用节流任务运行器),操作如下:
private static async Task<T[]> RunAsyncThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
{
var tasksParallelized = new List<Task<T>>();
using (var semaphore = new SemaphoreSlim(maxDegreeOfParallelism))
{
foreach (var task in tasks)
{
var taskParallelized = Task.Run(async () =>
{
await semaphore.WaitAsync();
try
{
return await task.Invoke();
}
finally
{
semaphore.Release();
}
});
tasksParallelized.Add(taskParallelized);
}
return await Task.WhenAll(tasksParallelized);
}
}
private static async Task<int> TestAsync(int num)
{
await Task.Delay(1000);
return 1 + num;
}
static async Task Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tasks = new List<Func<Task<int>>>();
var ints = new List<int>();
for (int i = 0; i < 10; i++)
{
tasks.Add(() => TestAsync(12000));
}
ints.AddRange(await RunAsyncThrottled(tasks, 1000));
Console.WriteLine($"{sw.Elapsed.TotalMilliseconds}, count: {ints.Count}");
Console.ReadLine();
}
另一种方法是使用 TPL DataFlow,它已经拥有您需要的一切,并且可以根据需要满足更复杂的 流水线,而且更多可配置的。它还可以节省您在示例解决方案中卸载到另一个任务的负担
private static async Task<IList<T>> TestThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
{
var options = new ExecutionDataflowBlockOptions() { EnsureOrdered = false, MaxDegreeOfParallelism = maxDegreeOfParallelism };
var transform = new TransformBlock<Func<Task<T>>, T>(func => func.Invoke(), options);
var outputBufferBlock = new BufferBlock<T>();
transform.LinkTo(outputBufferBlock, new DataflowLinkOptions(){PropagateCompletion = true});
foreach (var task in tasks)
transform.Post(task);
transform.Complete();
await outputBufferBlock. Completion;
outputBufferBlock.TryReceiveAll(out var result);
return result;
}
解决这个问题的关键是让节流器启动任务,而不是预先启动它们。由于使用旧的 Task.Start
方法显式启动任务非常受限(早于并且不能利用 async-await 机制),唯一的选择是让节流器创建任务。有多种方法可以做到这一点:
- 传递任务工厂而不是任务。此方法已在其他答案中演示。
private static async Task<TResult[]> RunAsyncThrottled<TResult>(
IEnumerable<Func<Task<TResult>>> taskFactories,
int maxDegreeOfParallelism)
{
//...
foreach (var taskFactory in taskFactories)
//...
var task = taskFactory();
TResult result = await task;
}
- 传递一系列项目和一个接受项目作为参数的任务工厂。这是最常用的方法:
private static async Task<TResult[]> RunAsyncThrottled<TSource, TResult>(
IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
int maxDegreeOfParallelism)
{
//...
foreach (var item in items)
//...
var task = taskFactory(item);
TResult result = await task;
}
- 传递延迟的可枚举任务。可以使用 LINQ 或迭代器(
yield
的方法)创建这样的枚举。
private static async Task<TResult[]> RunAsyncThrottled<TResult>(
IEnumerable<Task<TResult>> tasks, int maxDegreeOfParallelism)
{
if (tasks is ICollection<Task<TResult>>) throw new ArgumentException(
"The enumerable should not be materialized.", nameof(tasks));
//...
foreach (var task in tasks)
//...
TResult result = await task;
}
由于C# 8现已发布,因此在方法的return值上有一个替代方法。而不是 returning Task<TResult[]>
它可以 return
IAsyncEnumerable<TResult>
,允许使用 await foreach
.
private static async IAsyncEnumerable<TResult> RunAsyncThrottled<TSource, TResult>(
IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
int maxDegreeOfParallelism)
{
//...
foreach (var item in items)
//...
yield return await taskFactory(item);
}