WhenAll 上大量的Task
WhenAll on the large number of Task
我需要你的帮助来找到最佳解决方案。这是我的原始代码:
public async Task Test()
{
var tasks = new List<Task>();
string line;
using (var streamReader = File.OpenText(InputPath))
{
while ((line = streamReader.ReadLine()) != null)
{
tasks.Add(Process(line));
}
}
await Task.WhenAll(tasks.ToArray());
}
private Task Process(string line)
{
return Task.Run(() =>
{
Console.WriteLine(line);
});
}
它将读取包含行的文件并通过任务处理每一行。但是,如果文件超过 100 万行,任务数组更大,这段代码还好吗?或者我应该找到另一个解决方案。请帮我。谢谢
这是个坏主意。这可能会启动太多线程。
更好的方法是像这样简单地使用 Parallel.ForEach()
:
using System;
using System.IO;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static void Main()
{
string filename = @"Your test filename goes here";
Parallel.ForEach(File.ReadLines(filename), process);
}
private static void process(string line)
{
Console.WriteLine(line);
}
}
}
但是,这不使用 async/await。但是,如果需要,您可以将对 Parallel.ForEach()
的整个调用包装在一个任务中。
或者,如果您想使用 Task Parallel Library(一个 Microsoft NuGet 包),您可以这样做:
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
static class Program
{
static void Main()
{
Task.Run(test).Wait();
}
static async Task test()
{
string filename = @"Your filename goes here";
await processFile(filename);
}
static async Task processFile(string filename)
{
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
var action = new ActionBlock<string>(s => process(s), options);
foreach (var line in File.ReadLines(filename))
await action.SendAsync(line);
action.Complete();
await action.Completion;
}
static void process(string line)
{
Thread.Sleep(100); // Simulate work.
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " " + line);
}
}
}
这会给你 async
支持。
附录:线程池节流的演示。
(这是对 shay__ 评论的回应。)
如果您启动许多长运行宁任务,其中任务花费的时间超过 运行 秒左右,您可能会看到线程池节流。
如果当前进程的线程池线程数等于或超过调用 ThreadPool.GetMinThreads(out workers, out ports);
.
返回的 worker
计数,就会发生这种情况
如果发生这种情况,新线程池线程的启动将在创建新线程池线程之前延迟一小段时间(在我的系统上为一秒)。通常这将允许另一个线程池线程变得可用,并且将被使用(这当然是
节流)。
以下代码演示了该问题:
int workers, ports;
ThreadPool.GetMinThreads(out workers, out ports);
Console.WriteLine("Min workers = " + workers); // Prints 8 on my system.
var sw = Stopwatch.StartNew();
for (int i = 0; i < 100; ++i)
{
Task.Run(() =>
{
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} started at time {sw.Elapsed}");
Thread.Sleep(10000);
});
}
Console.ReadLine();
在我的系统上,打印如下:
Min workers = 8
Thread 3 started at time 00:00:00.0098651
Thread 6 started at time 00:00:00.0098651
Thread 8 started at time 00:00:00.0099841
Thread 5 started at time 00:00:00.0099680
Thread 7 started at time 00:00:00.0099918
Thread 4 started at time 00:00:00.0098739
Thread 10 started at time 00:00:00.0100828
Thread 9 started at time 00:00:00.0101833
Thread 11 started at time 00:00:01.0096247
Thread 12 started at time 00:00:02.0098105
Thread 13 started at time 00:00:03.0099824
Thread 14 started at time 00:00:04.0100671
Thread 15 started at time 00:00:05.0098035
Thread 16 started at time 00:00:06.0099449
Thread 17 started at time 00:00:07.0096293
Thread 18 started at time 00:00:08.0106774
Thread 19 started at time 00:00:09.0098193
Thread 20 started at time 00:00:10.0104156
Thread 3 started at time 00:00:10.0109315
Thread 8 started at time 00:00:10.0112171
Thread 7 started at time 00:00:10.0112531
Thread 9 started at time 00:00:10.0117256
Thread 4 started at time 00:00:10.0117920
Thread 10 started at time 00:00:10.0117298
Thread 6 started at time 00:00:10.0109381
Thread 5 started at time 00:00:10.0112276
Thread 21 started at time 00:00:11.0095859
Thread 11 started at time 00:00:11.0101189
Thread 22 started at time 00:00:12.0095421
Thread 12 started at time 00:00:12.0111173
Thread 23 started at time 00:00:13.0095932 ...
请注意前 8 个线程如何启动得非常快,但随后新线程被限制为每秒一个左右,直到第一批线程终止然后可以重新使用。
另请注意,只有当线程需要相对较长的时间才能终止时才会出现此效果。
我需要你的帮助来找到最佳解决方案。这是我的原始代码:
public async Task Test()
{
var tasks = new List<Task>();
string line;
using (var streamReader = File.OpenText(InputPath))
{
while ((line = streamReader.ReadLine()) != null)
{
tasks.Add(Process(line));
}
}
await Task.WhenAll(tasks.ToArray());
}
private Task Process(string line)
{
return Task.Run(() =>
{
Console.WriteLine(line);
});
}
它将读取包含行的文件并通过任务处理每一行。但是,如果文件超过 100 万行,任务数组更大,这段代码还好吗?或者我应该找到另一个解决方案。请帮我。谢谢
这是个坏主意。这可能会启动太多线程。
更好的方法是像这样简单地使用 Parallel.ForEach()
:
using System;
using System.IO;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static void Main()
{
string filename = @"Your test filename goes here";
Parallel.ForEach(File.ReadLines(filename), process);
}
private static void process(string line)
{
Console.WriteLine(line);
}
}
}
但是,这不使用 async/await。但是,如果需要,您可以将对 Parallel.ForEach()
的整个调用包装在一个任务中。
或者,如果您想使用 Task Parallel Library(一个 Microsoft NuGet 包),您可以这样做:
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
static class Program
{
static void Main()
{
Task.Run(test).Wait();
}
static async Task test()
{
string filename = @"Your filename goes here";
await processFile(filename);
}
static async Task processFile(string filename)
{
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
var action = new ActionBlock<string>(s => process(s), options);
foreach (var line in File.ReadLines(filename))
await action.SendAsync(line);
action.Complete();
await action.Completion;
}
static void process(string line)
{
Thread.Sleep(100); // Simulate work.
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " " + line);
}
}
}
这会给你 async
支持。
附录:线程池节流的演示。
(这是对 shay__ 评论的回应。)
如果您启动许多长运行宁任务,其中任务花费的时间超过 运行 秒左右,您可能会看到线程池节流。
如果当前进程的线程池线程数等于或超过调用 ThreadPool.GetMinThreads(out workers, out ports);
.
worker
计数,就会发生这种情况
如果发生这种情况,新线程池线程的启动将在创建新线程池线程之前延迟一小段时间(在我的系统上为一秒)。通常这将允许另一个线程池线程变得可用,并且将被使用(这当然是 节流)。
以下代码演示了该问题:
int workers, ports;
ThreadPool.GetMinThreads(out workers, out ports);
Console.WriteLine("Min workers = " + workers); // Prints 8 on my system.
var sw = Stopwatch.StartNew();
for (int i = 0; i < 100; ++i)
{
Task.Run(() =>
{
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} started at time {sw.Elapsed}");
Thread.Sleep(10000);
});
}
Console.ReadLine();
在我的系统上,打印如下:
Min workers = 8
Thread 3 started at time 00:00:00.0098651
Thread 6 started at time 00:00:00.0098651
Thread 8 started at time 00:00:00.0099841
Thread 5 started at time 00:00:00.0099680
Thread 7 started at time 00:00:00.0099918
Thread 4 started at time 00:00:00.0098739
Thread 10 started at time 00:00:00.0100828
Thread 9 started at time 00:00:00.0101833
Thread 11 started at time 00:00:01.0096247
Thread 12 started at time 00:00:02.0098105
Thread 13 started at time 00:00:03.0099824
Thread 14 started at time 00:00:04.0100671
Thread 15 started at time 00:00:05.0098035
Thread 16 started at time 00:00:06.0099449
Thread 17 started at time 00:00:07.0096293
Thread 18 started at time 00:00:08.0106774
Thread 19 started at time 00:00:09.0098193
Thread 20 started at time 00:00:10.0104156
Thread 3 started at time 00:00:10.0109315
Thread 8 started at time 00:00:10.0112171
Thread 7 started at time 00:00:10.0112531
Thread 9 started at time 00:00:10.0117256
Thread 4 started at time 00:00:10.0117920
Thread 10 started at time 00:00:10.0117298
Thread 6 started at time 00:00:10.0109381
Thread 5 started at time 00:00:10.0112276
Thread 21 started at time 00:00:11.0095859
Thread 11 started at time 00:00:11.0101189
Thread 22 started at time 00:00:12.0095421
Thread 12 started at time 00:00:12.0111173
Thread 23 started at time 00:00:13.0095932 ...
请注意前 8 个线程如何启动得非常快,但随后新线程被限制为每秒一个左右,直到第一批线程终止然后可以重新使用。
另请注意,只有当线程需要相对较长的时间才能终止时才会出现此效果。