尝试 运行 多个任务与 .WhenAll 并行,但任务不是 运行 并行
Trying to run multiple tasks in parallel with .WhenAll but tasks aren't being run in parallel
我有一种方法可以将 csv 文件转换为特定模型,我想将其拆分为多个任务,因为有 700k+ 条记录。我在方法中使用了 .Skip
和 .Take
,因此该方法的每个 运行ning 都知道从哪里开始以及需要多少。我有一个数字 1-10 的列表,我想迭代并创建任务 运行 此方法使用该迭代器创建任务并进行一些数学运算以确定要跳过的记录数。
以下是我创建任务的方式:
var numberOfTasksList = Enumerable.Range(1, 10).ToList();
//I left out the math to determine rowsPerTask used as a parameter in the below method for brevity's sake
var tasks = numberOfTasksList.Select(i
=> ReadRowsList<T>(props, fields, csv, context, zohoEntities, i, i*rowsPerTask, (i-1)*rowsPerTask));
await Task.WhenAll(tasks);
使用的 ReadRowsList
方法如下所示(没有参数):
public static async Task<string> ReadRowsList<T>(...parameters) where T : class, new()
{
//work to run
return $"added rows for task {i}";
}
那个方法的字符串 returns 只是一个简单的行,上面写着 $"added rows for task {i}" 所以它不是一个正确的 async/await 因为我只是返回一个说明迭代何时完成的字符串。
但是,当我 运行 程序时,该方法会等待第一次迭代(其中 i=1)完成,然后再开始 运行 程序的第二次迭代,所以它不是运行宁并行。在 async/parallel 编程方面我不是最好的,但是是否有明显的事情会导致任务必须等到上一次迭代完成才能开始下一个任务?根据我的理解,使用上面的代码创建任务并使用 .WhenAll(tasks)
将为每次迭代创建一个新线程,但我一定遗漏了一些东西。
简而言之:
async
不等于多线程;和
- 创建函数
async Task
不会使其成为异步的
当 Task.WhenAll
是 运行 假装异步代码没有 await
时,当前线程无法 'let go' 手头的任务,也无法开始处理另一个任务。
正如评论中指出的那样,构建链通过以下方式警告您:
This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
简单的例子
让我们考虑两个具有相同签名的函数,一个有异步代码,一个没有。
static async Task DoWorkPretendAsync(int taskId)
{
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > start");
Thread.Sleep(TimeSpan.FromSeconds(1));
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > done");
}
static async Task DoWorkAsync(int taskId)
{
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > start");
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > done");
}
如果我们用下面的代码片段测试它们
await DoItAsync(DoWorkPretendAsync);
Console.WriteLine();
await DoItAsync(DoWorkAsync);
async Task DoItAsync(Func<int, Task> f)
{
var tasks = Enumerable.Range(start: 0, count: 3).Select(i => f(i));
Console.WriteLine("Before WhenAll");
await Task.WhenAll(tasks);
Console.WriteLine("After WhenAll");
}
我们可以看到 DoWorkPretendAsync
任务是按顺序执行的。
Before WhenAll
Thread: 1 -> task:0 > start
Thread: 1 -> task:0 > done
Thread: 1 -> task:1 > start
Thread: 1 -> task:1 > done
Thread: 1 -> task:2 > start
Thread: 1 -> task:2 > done
After WhenAll
Before WhenAll
Thread: 1 -> task:0 > start
Thread: 1 -> task:1 > start
Thread: 1 -> task:2 > start
Thread: 5 -> task:0 > done
Thread: 5 -> task:2 > done
Thread: 7 -> task:1 > done
After WhenAll
注意事项:
- 即使使用真正的异步,所有任务也由同一个线程启动;
- 在这个特定的 运行 中,两个任务由同一个线程 (id:5) 完成。这根本无法保证 - 一项任务可以在一个线程上启动,稍后在池中的另一个线程上继续。
我有一种方法可以将 csv 文件转换为特定模型,我想将其拆分为多个任务,因为有 700k+ 条记录。我在方法中使用了 .Skip
和 .Take
,因此该方法的每个 运行ning 都知道从哪里开始以及需要多少。我有一个数字 1-10 的列表,我想迭代并创建任务 运行 此方法使用该迭代器创建任务并进行一些数学运算以确定要跳过的记录数。
以下是我创建任务的方式:
var numberOfTasksList = Enumerable.Range(1, 10).ToList();
//I left out the math to determine rowsPerTask used as a parameter in the below method for brevity's sake
var tasks = numberOfTasksList.Select(i
=> ReadRowsList<T>(props, fields, csv, context, zohoEntities, i, i*rowsPerTask, (i-1)*rowsPerTask));
await Task.WhenAll(tasks);
使用的 ReadRowsList
方法如下所示(没有参数):
public static async Task<string> ReadRowsList<T>(...parameters) where T : class, new()
{
//work to run
return $"added rows for task {i}";
}
那个方法的字符串 returns 只是一个简单的行,上面写着 $"added rows for task {i}" 所以它不是一个正确的 async/await 因为我只是返回一个说明迭代何时完成的字符串。
但是,当我 运行 程序时,该方法会等待第一次迭代(其中 i=1)完成,然后再开始 运行 程序的第二次迭代,所以它不是运行宁并行。在 async/parallel 编程方面我不是最好的,但是是否有明显的事情会导致任务必须等到上一次迭代完成才能开始下一个任务?根据我的理解,使用上面的代码创建任务并使用 .WhenAll(tasks)
将为每次迭代创建一个新线程,但我一定遗漏了一些东西。
简而言之:
async
不等于多线程;和- 创建函数
async Task
不会使其成为异步的
当 Task.WhenAll
是 运行 假装异步代码没有 await
时,当前线程无法 'let go' 手头的任务,也无法开始处理另一个任务。
正如评论中指出的那样,构建链通过以下方式警告您:
This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
简单的例子
让我们考虑两个具有相同签名的函数,一个有异步代码,一个没有。
static async Task DoWorkPretendAsync(int taskId)
{
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > start");
Thread.Sleep(TimeSpan.FromSeconds(1));
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > done");
}
static async Task DoWorkAsync(int taskId)
{
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > start");
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > done");
}
如果我们用下面的代码片段测试它们
await DoItAsync(DoWorkPretendAsync);
Console.WriteLine();
await DoItAsync(DoWorkAsync);
async Task DoItAsync(Func<int, Task> f)
{
var tasks = Enumerable.Range(start: 0, count: 3).Select(i => f(i));
Console.WriteLine("Before WhenAll");
await Task.WhenAll(tasks);
Console.WriteLine("After WhenAll");
}
我们可以看到 DoWorkPretendAsync
任务是按顺序执行的。
Before WhenAll
Thread: 1 -> task:0 > start
Thread: 1 -> task:0 > done
Thread: 1 -> task:1 > start
Thread: 1 -> task:1 > done
Thread: 1 -> task:2 > start
Thread: 1 -> task:2 > done
After WhenAll
Before WhenAll
Thread: 1 -> task:0 > start
Thread: 1 -> task:1 > start
Thread: 1 -> task:2 > start
Thread: 5 -> task:0 > done
Thread: 5 -> task:2 > done
Thread: 7 -> task:1 > done
After WhenAll
注意事项:
- 即使使用真正的异步,所有任务也由同一个线程启动;
- 在这个特定的 运行 中,两个任务由同一个线程 (id:5) 完成。这根本无法保证 - 一项任务可以在一个线程上启动,稍后在池中的另一个线程上继续。