带有 await 的每个循环的任务工厂

Task Factory for each loop with await

我是新手,对使用有疑问。 Task.Factory 是否为 foreach 循环中的所有项目触发或在 'await' 处阻塞基本上使程序成为单线程?如果我正确地考虑了这一点,则 foreach 循环将启动所有任务和 .GetAwaiter().GetResult();阻塞主线程直到最后一个任务完成。

另外,我只是想要一些匿名任务来加载数据。这是一个正确的实现吗?我指的不是异常处理,因为这只是一个示例。

为了清楚起见,我从外部 API 将数据加载到数据库中。这个使用的是 FRED 数据库。 (https://fred.stlouisfed.org/),但我有几个我会点击以完成整个传输(可能是 200k 数据点)。完成后,我会更新表格、刷新市场计算等。其中一些是实时的,一些是收盘时的。我还想说,我目前在 docker 中一切正常,但一直在努力使用任务更新代码以提高执行力。

class Program
{
    private async Task SQLBulkLoader() 
    {

        foreach (var fileListObj in indicators.file_list)
        {
            await Task.Factory.StartNew(  () =>
            {

                string json = this.GET(//API call);

                SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);

                DataTable dataTableConversion = ConvertToDataTable(obj.observations);
                dataTableConversion.TableName = fileListObj.series_id;

                using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
                {
                    dbConnection.Open();
                    using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
                    {
                      s.DestinationTableName = dataTableConversion.TableName;
                      foreach (var column in dataTableConversion.Columns)
                          s.ColumnMappings.Add(column.ToString(), column.ToString());
                      s.WriteToServer(dataTableConversion);
                    }

                  Console.WriteLine("File: {0} Complete", fileListObj.series_id);
                }
             });
        }            
    }

    static void Main(string[] args)
    {
        Program worker = new Program();
        worker.SQLBulkLoader().GetAwaiter().GetResult();
    }
}

你为什么不试试这个 :),这个程序不会启动并行任务(在 foreach 中),它会阻塞但是任务中的逻辑将在线程池中的单独线程中完成(当时只有一个,但是主线程会被阻塞)。

根据您的情况,正确的方法是使用 Paraller.ForEach How can I convert this foreach code to Parallel.ForEach?

使用 Parallel.ForEach 循环在任何 System.Collections.Generic.IEnumerable<T> 源上启用数据并行。

// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
    Parallel.ForEach(fileList, (currentFile) => 
    {

       //Doing Stuff

      Console.WriteLine("Processing {0} on thread {1}", currentFile, Thread.CurrentThread.ManagedThreadId);
    });

您等待从 Task.Factory.StartNew 返回的任务确实使它成为有效的单线程。您可以通过这个简短的 LinqPad 示例看到对此的简单演示:

for (var i = 0; i < 3; i++)
{
    var index = i;
    $"{index} inline".Dump();
    await Task.Run(() =>
    {
        Thread.Sleep((3 - index) * 1000);
        $"{index} in thread".Dump();
    });
}

在这里,随着循环的进行,我们等待的时间更少。输出是:

0 inline
0 in thread
1 inline
1 in thread
2 inline
2 in thread

如果删除 StartNew 前面的 await,您会看到它是并行运行的。正如其他人所提到的,您当然可以使用 Parallel.ForEach,但是为了更手动地进行演示,您可以考虑这样的解决方案:

var tasks = new List<Task>();

for (var i = 0; i < 3; i++) 
{
    var index = i;
    $"{index} inline".Dump();
    tasks.Add(Task.Factory.StartNew(() =>
    {
        Thread.Sleep((3 - index) * 1000);
        $"{index} in thread".Dump();
    }));
}

Task.WaitAll(tasks.ToArray());

现在请注意结果如何:

0 inline
1 inline
2 inline
2 in thread
1 in thread
0 in thread

您需要将每个任务添加到一个集合中,然后使用 Task.WhenAll 等待该集合中的所有任务:

private async Task SQLBulkLoader() 
{ 
  var tasks = new List<Task>();
  foreach (var fileListObj in indicators.file_list)
  {
    tasks.Add(Task.Factory.StartNew( () => { //Doing Stuff }));
  }

  await Task.WhenAll(tasks.ToArray());
}

这是一个典型的问题,C# 8.0 Async Streams 很快就会解决。

在 C# 8.0 发布之前,您可以使用 AsyncEnumarator library:

using System.Collections.Async;

class Program
{
    private async Task SQLBulkLoader() {

        await indicators.file_list.ParallelForEachAsync(async fileListObj =>
        {
            ...
            await s.WriteToServerAsync(dataTableConversion);
            ...
        },
        maxDegreeOfParalellism: 3,
        cancellationToken: default);
    }

    static void Main(string[] args)
    {
        Program worker = new Program();
        worker.SQLBulkLoader().GetAwaiter().GetResult();
    }
}

我不建议使用 Parallel.ForEachTask.WhenAll,因为这些函数不是为异步流设计的。

我对此的看法:最耗时的操作是使用 GET 操作获取数据和使用 SqlBulkCopyWriteToServer 的实际调用。如果你看一下 class 你会发现有一个本地异步方法 WriteToServerAsync 方法 (docs here) .在使用 Task.Run.

自己创建任务之前始终使用这些

这同样适用于 http GET 调用。您可以为此使用本机 HttpClient.GetAsync (docs here)。

这样做你可以重写你的代码:

private async Task ProcessFileAsync(string series_id)
{
    string json = await GetAsync();

    SeriesObject obj = JsonConvert.DeserializeObject<SeriesObject>(json);

    DataTable dataTableConversion = ConvertToDataTable(obj.observations);
    dataTableConversion.TableName = series_id;

    using (SqlConnection dbConnection = new SqlConnection("SQL Connection"))
    {
        dbConnection.Open();
        using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
        {
            s.DestinationTableName = dataTableConversion.TableName;
            foreach (var column in dataTableConversion.Columns)
                s.ColumnMappings.Add(column.ToString(), column.ToString());
            await s.WriteToServerAsync(dataTableConversion);
        }

        Console.WriteLine("File: {0} Complete", series_id);
    }
}

private async Task SQLBulkLoaderAsync()
{
    var tasks = indicators.file_list.Select(f => ProcessFileAsync(f.series_id));
    await Task.WhenAll(tasks);
}

两个操作(http 调用和 sql 服务器调用)都是 I/O 调用。使用本机 async/await 模式甚至不会创建或使用线程,请参阅 for a more in-depth explanation. That is why for IO bound operations you should never have to use Task.Run (or Task.Factory.StartNew. But do mind that Task.Run is the recommended approach)。

旁注:如果您在循环中使用 HttpClient,请阅读 this 了解如何正确使用它。

如果您需要限制并行操作的数量,您也可以使用 TPL Dataflow,因为它非常适合基于任务的 IO 绑定操作。然后应将 SQLBulkLoaderAsync 修改为(完整保留此答案前面的 ProcessFileAsync 方法):

private async Task SQLBulkLoaderAsync()
{
    var ab = new ActionBlock<string>(ProcessFileAsync, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    foreach (var file in indicators.file_list)
    {
        ab.Post(file.series_id);
    }

    ab.Complete();
    await ab.Completion;
}