带有 await 的每个循环的任务工厂
Task Factory for each loop with await
我是新手,对使用有疑问。 Task.Factory 是否为 foreach 循环中的所有项目触发或在 'await' 处阻塞基本上使程序成为单线程?如果我正确地考虑了这一点,则 foreach 循环将启动所有任务和 .GetAwaiter().GetResult();阻塞主线程直到最后一个任务完成。
另外,我只是想要一些匿名任务来加载数据。这是一个正确的实现吗?我指的不是异常处理,因为这只是一个示例。
为了清楚起见,我从外部 API 将数据加载到数据库中。这个使用的是 FRED 数据库。 (https://fred.stlouisfed.org/),但我有几个我会点击以完成整个传输(可能是 200k 数据点)。完成后,我会更新表格、刷新市场计算等。其中一些是实时的,一些是收盘时的。我还想说,我目前在 docker 中一切正常,但一直在努力使用任务更新代码以提高执行力。
class Program
{
private async Task SQLBulkLoader()
{
foreach (var fileListObj in indicators.file_list)
{
await Task.Factory.StartNew( () =>
{
string json = this.GET(//API call);
SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);
DataTable dataTableConversion = ConvertToDataTable(obj.observations);
dataTableConversion.TableName = fileListObj.series_id;
using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
{
dbConnection.Open();
using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
{
s.DestinationTableName = dataTableConversion.TableName;
foreach (var column in dataTableConversion.Columns)
s.ColumnMappings.Add(column.ToString(), column.ToString());
s.WriteToServer(dataTableConversion);
}
Console.WriteLine("File: {0} Complete", fileListObj.series_id);
}
});
}
}
static void Main(string[] args)
{
Program worker = new Program();
worker.SQLBulkLoader().GetAwaiter().GetResult();
}
}
你为什么不试试这个 :),这个程序不会启动并行任务(在 foreach 中),它会阻塞但是任务中的逻辑将在线程池中的单独线程中完成(当时只有一个,但是主线程会被阻塞)。
根据您的情况,正确的方法是使用 Paraller.ForEach
How can I convert this foreach code to Parallel.ForEach?
使用 Parallel.ForEach
循环在任何 System.Collections.Generic.IEnumerable<T>
源上启用数据并行。
// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
Parallel.ForEach(fileList, (currentFile) =>
{
//Doing Stuff
Console.WriteLine("Processing {0} on thread {1}", currentFile, Thread.CurrentThread.ManagedThreadId);
});
您等待从 Task.Factory.StartNew
返回的任务确实使它成为有效的单线程。您可以通过这个简短的 LinqPad 示例看到对此的简单演示:
for (var i = 0; i < 3; i++)
{
var index = i;
$"{index} inline".Dump();
await Task.Run(() =>
{
Thread.Sleep((3 - index) * 1000);
$"{index} in thread".Dump();
});
}
在这里,随着循环的进行,我们等待的时间更少。输出是:
0 inline
0 in thread
1 inline
1 in thread
2 inline
2 in thread
如果删除 StartNew
前面的 await
,您会看到它是并行运行的。正如其他人所提到的,您当然可以使用 Parallel.ForEach
,但是为了更手动地进行演示,您可以考虑这样的解决方案:
var tasks = new List<Task>();
for (var i = 0; i < 3; i++)
{
var index = i;
$"{index} inline".Dump();
tasks.Add(Task.Factory.StartNew(() =>
{
Thread.Sleep((3 - index) * 1000);
$"{index} in thread".Dump();
}));
}
Task.WaitAll(tasks.ToArray());
现在请注意结果如何:
0 inline
1 inline
2 inline
2 in thread
1 in thread
0 in thread
您需要将每个任务添加到一个集合中,然后使用 Task.WhenAll 等待该集合中的所有任务:
private async Task SQLBulkLoader()
{
var tasks = new List<Task>();
foreach (var fileListObj in indicators.file_list)
{
tasks.Add(Task.Factory.StartNew( () => { //Doing Stuff }));
}
await Task.WhenAll(tasks.ToArray());
}
这是一个典型的问题,C# 8.0 Async Streams 很快就会解决。
在 C# 8.0 发布之前,您可以使用 AsyncEnumarator library:
using System.Collections.Async;
class Program
{
private async Task SQLBulkLoader() {
await indicators.file_list.ParallelForEachAsync(async fileListObj =>
{
...
await s.WriteToServerAsync(dataTableConversion);
...
},
maxDegreeOfParalellism: 3,
cancellationToken: default);
}
static void Main(string[] args)
{
Program worker = new Program();
worker.SQLBulkLoader().GetAwaiter().GetResult();
}
}
我不建议使用 Parallel.ForEach
和 Task.WhenAll
,因为这些函数不是为异步流设计的。
我对此的看法:最耗时的操作是使用 GET 操作获取数据和使用 SqlBulkCopy
对 WriteToServer
的实际调用。如果你看一下 class 你会发现有一个本地异步方法 WriteToServerAsync
方法 (docs here)
.在使用 Task.Run
.
自己创建任务之前始终使用这些
这同样适用于 http GET 调用。您可以为此使用本机 HttpClient.GetAsync
(docs here)。
这样做你可以重写你的代码:
private async Task ProcessFileAsync(string series_id)
{
string json = await GetAsync();
SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);
DataTable dataTableConversion = ConvertToDataTable(obj.observations);
dataTableConversion.TableName = series_id;
using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
{
dbConnection.Open();
using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
{
s.DestinationTableName = dataTableConversion.TableName;
foreach (var column in dataTableConversion.Columns)
s.ColumnMappings.Add(column.ToString(), column.ToString());
await s.WriteToServerAsync(dataTableConversion);
}
Console.WriteLine("File: {0} Complete", series_id);
}
}
private async Task SQLBulkLoaderAsync()
{
var tasks = indicators.file_list.Select(f => ProcessFileAsync(f.series_id));
await Task.WhenAll(tasks);
}
两个操作(http 调用和 sql 服务器调用)都是 I/O 调用。使用本机 async/await 模式甚至不会创建或使用线程,请参阅 for a more in-depth explanation. That is why for IO bound operations you should never have to use Task.Run
(or Task.Factory.StartNew
. But do mind that Task.Run
is the recommended approach)。
旁注:如果您在循环中使用 HttpClient
,请阅读 this 了解如何正确使用它。
如果您需要限制并行操作的数量,您也可以使用 TPL Dataflow,因为它非常适合基于任务的 IO 绑定操作。然后应将 SQLBulkLoaderAsync
修改为(完整保留此答案前面的 ProcessFileAsync
方法):
private async Task SQLBulkLoaderAsync()
{
var ab = new ActionBlock<string>(ProcessFileAsync, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
foreach (var file in indicators.file_list)
{
ab.Post(file.series_id);
}
ab.Complete();
await ab.Completion;
}
我是新手,对使用有疑问。 Task.Factory 是否为 foreach 循环中的所有项目触发或在 'await' 处阻塞基本上使程序成为单线程?如果我正确地考虑了这一点,则 foreach 循环将启动所有任务和 .GetAwaiter().GetResult();阻塞主线程直到最后一个任务完成。
另外,我只是想要一些匿名任务来加载数据。这是一个正确的实现吗?我指的不是异常处理,因为这只是一个示例。
为了清楚起见,我从外部 API 将数据加载到数据库中。这个使用的是 FRED 数据库。 (https://fred.stlouisfed.org/),但我有几个我会点击以完成整个传输(可能是 200k 数据点)。完成后,我会更新表格、刷新市场计算等。其中一些是实时的,一些是收盘时的。我还想说,我目前在 docker 中一切正常,但一直在努力使用任务更新代码以提高执行力。
class Program
{
private async Task SQLBulkLoader()
{
foreach (var fileListObj in indicators.file_list)
{
await Task.Factory.StartNew( () =>
{
string json = this.GET(//API call);
SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);
DataTable dataTableConversion = ConvertToDataTable(obj.observations);
dataTableConversion.TableName = fileListObj.series_id;
using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
{
dbConnection.Open();
using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
{
s.DestinationTableName = dataTableConversion.TableName;
foreach (var column in dataTableConversion.Columns)
s.ColumnMappings.Add(column.ToString(), column.ToString());
s.WriteToServer(dataTableConversion);
}
Console.WriteLine("File: {0} Complete", fileListObj.series_id);
}
});
}
}
static void Main(string[] args)
{
Program worker = new Program();
worker.SQLBulkLoader().GetAwaiter().GetResult();
}
}
你为什么不试试这个 :),这个程序不会启动并行任务(在 foreach 中),它会阻塞但是任务中的逻辑将在线程池中的单独线程中完成(当时只有一个,但是主线程会被阻塞)。
根据您的情况,正确的方法是使用 Paraller.ForEach How can I convert this foreach code to Parallel.ForEach?
使用 Parallel.ForEach
循环在任何 System.Collections.Generic.IEnumerable<T>
源上启用数据并行。
// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
Parallel.ForEach(fileList, (currentFile) =>
{
//Doing Stuff
Console.WriteLine("Processing {0} on thread {1}", currentFile, Thread.CurrentThread.ManagedThreadId);
});
您等待从 Task.Factory.StartNew
返回的任务确实使它成为有效的单线程。您可以通过这个简短的 LinqPad 示例看到对此的简单演示:
for (var i = 0; i < 3; i++)
{
var index = i;
$"{index} inline".Dump();
await Task.Run(() =>
{
Thread.Sleep((3 - index) * 1000);
$"{index} in thread".Dump();
});
}
在这里,随着循环的进行,我们等待的时间更少。输出是:
0 inline
0 in thread
1 inline
1 in thread
2 inline
2 in thread
如果删除 StartNew
前面的 await
,您会看到它是并行运行的。正如其他人所提到的,您当然可以使用 Parallel.ForEach
,但是为了更手动地进行演示,您可以考虑这样的解决方案:
var tasks = new List<Task>();
for (var i = 0; i < 3; i++)
{
var index = i;
$"{index} inline".Dump();
tasks.Add(Task.Factory.StartNew(() =>
{
Thread.Sleep((3 - index) * 1000);
$"{index} in thread".Dump();
}));
}
Task.WaitAll(tasks.ToArray());
现在请注意结果如何:
0 inline
1 inline
2 inline
2 in thread
1 in thread
0 in thread
您需要将每个任务添加到一个集合中,然后使用 Task.WhenAll 等待该集合中的所有任务:
private async Task SQLBulkLoader()
{
var tasks = new List<Task>();
foreach (var fileListObj in indicators.file_list)
{
tasks.Add(Task.Factory.StartNew( () => { //Doing Stuff }));
}
await Task.WhenAll(tasks.ToArray());
}
这是一个典型的问题,C# 8.0 Async Streams 很快就会解决。
在 C# 8.0 发布之前,您可以使用 AsyncEnumarator library:
using System.Collections.Async;
class Program
{
private async Task SQLBulkLoader() {
await indicators.file_list.ParallelForEachAsync(async fileListObj =>
{
...
await s.WriteToServerAsync(dataTableConversion);
...
},
maxDegreeOfParalellism: 3,
cancellationToken: default);
}
static void Main(string[] args)
{
Program worker = new Program();
worker.SQLBulkLoader().GetAwaiter().GetResult();
}
}
我不建议使用 Parallel.ForEach
和 Task.WhenAll
,因为这些函数不是为异步流设计的。
我对此的看法:最耗时的操作是使用 GET 操作获取数据和使用 SqlBulkCopy
对 WriteToServer
的实际调用。如果你看一下 class 你会发现有一个本地异步方法 WriteToServerAsync
方法 (docs here)
.在使用 Task.Run
.
这同样适用于 http GET 调用。您可以为此使用本机 HttpClient.GetAsync
(docs here)。
这样做你可以重写你的代码:
private async Task ProcessFileAsync(string series_id)
{
string json = await GetAsync();
SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);
DataTable dataTableConversion = ConvertToDataTable(obj.observations);
dataTableConversion.TableName = series_id;
using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
{
dbConnection.Open();
using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
{
s.DestinationTableName = dataTableConversion.TableName;
foreach (var column in dataTableConversion.Columns)
s.ColumnMappings.Add(column.ToString(), column.ToString());
await s.WriteToServerAsync(dataTableConversion);
}
Console.WriteLine("File: {0} Complete", series_id);
}
}
private async Task SQLBulkLoaderAsync()
{
var tasks = indicators.file_list.Select(f => ProcessFileAsync(f.series_id));
await Task.WhenAll(tasks);
}
两个操作(http 调用和 sql 服务器调用)都是 I/O 调用。使用本机 async/await 模式甚至不会创建或使用线程,请参阅 Task.Run
(or Task.Factory.StartNew
. But do mind that Task.Run
is the recommended approach)。
旁注:如果您在循环中使用 HttpClient
,请阅读 this 了解如何正确使用它。
如果您需要限制并行操作的数量,您也可以使用 TPL Dataflow,因为它非常适合基于任务的 IO 绑定操作。然后应将 SQLBulkLoaderAsync
修改为(完整保留此答案前面的 ProcessFileAsync
方法):
private async Task SQLBulkLoaderAsync()
{
var ab = new ActionBlock<string>(ProcessFileAsync, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
foreach (var file in indicators.file_list)
{
ab.Post(file.series_id);
}
ab.Complete();
await ab.Completion;
}