MaxDegreeOfParallelism 与 Task.Factory.StartNew()
MaxDegreeOfParallelism with Task.Factory.StartNew()
我有一个程序,可以从 Internet 下载文件并对其进行处理。以下是我编写的使用线程下载文件的函数。
Task<File> re = Task.Factory.StartNew(() => { /*Download the File*/ });
re.ContinueWith((x) => { /*Do another function*/ });
我现在希望它只使用 10 个线程进行下载。我查看了 ParallelOptions.MaxDegreeOfParallelism 属性,但我无法理解当任务 returns 结果时如何使用它。
您可以使用类似的东西:
Func<File> work = () => {
// Do something
File file = ...
return file
};
var maxNoOfWorkers = 10;
IEnumerable<Task> tasks = Enumerable.Range(0, maxNoOfWorkers)
.Select(s =>
{
var task = Task.Factory.StartNew<File>(work);
return task.ContinueWith(ant => { /* do soemthing else */ });
});
这种方式 TPL
决定从 threadpool
获得多少线程,如果您真的想创建一个专用的 (non-threadpool
) 线程,您可以使用以下方法这样做:
IEnumerable<Task> tasks = Enumerable.Range(0, maxNoOfWorkers)
.Select(s =>
{
var task = Task.Factory.StartNew<File>(
work,
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
return task.ContinueWith(ant => { /* do soemthing else */ });
});
您的其他选择是使用 PLINQ
或 Paraller.For/ForEach
,您可以使用 MaxDegreeOfParallelism
。
一个PLINQ
例子可以是:
Func<File> work = () => {
// Do something
File file = ...
return file
};
var maxNoOfWorkers = 10;
ParallelEnumerable.Range(0, maxNoOfWorkers)
.WithDegreeOfParallelism(maxNoOfWorkers)
.ForAll(x => {
var file = work();
// Do something with file
});
当然我不知道你的例子的上下文,所以你可能需要根据你的要求进行调整。
一个好的方法是使用 DataFlow API. To use it, you have to install the Microsoft.Tpl.Dataflow Nuget package.
假设您有以下几种下载和处理数据的方法:
public async Task<DownloadResult> DownloadFile(string url)
{
//Asynchronously download the file and return the result of the download.
//You don't need a thread to download the file if you use asynchronous API.
}
public ProcessingResult ProcessDownloadResult(DownloadResult download_result)
{
//Synchronously process the download result and produce a ProcessingResult.
}
假设您有一个要下载的 URL 列表:
List<string> urls = new List<string>();
然后您可以使用 DataFlow API 执行以下操作:
TransformBlock<string,DownloadResult> download_block =
new TransformBlock<string, DownloadResult>(
url => DownloadFile(url),
new ExecutionDataflowBlockOptions
{
//Only 10 asynchronous download operations
//can happen at any point in time.
MaxDegreeOfParallelism = 10
});
TransformBlock<DownloadResult, ProcessingResult> process_block =
new TransformBlock<DownloadResult, ProcessingResult>(
dr => ProcessDownloadResult(dr),
new ExecutionDataflowBlockOptions
{
//We limit the number of CPU intensive operation
//to the number of processors in the system.
MaxDegreeOfParallelism = Environment.ProcessorCount
});
download_block.LinkTo(process_block);
foreach(var url in urls)
{
download_block.Post(url);
}
我有一个程序,可以从 Internet 下载文件并对其进行处理。以下是我编写的使用线程下载文件的函数。
Task<File> re = Task.Factory.StartNew(() => { /*Download the File*/ });
re.ContinueWith((x) => { /*Do another function*/ });
我现在希望它只使用 10 个线程进行下载。我查看了 ParallelOptions.MaxDegreeOfParallelism 属性,但我无法理解当任务 returns 结果时如何使用它。
您可以使用类似的东西:
Func<File> work = () => {
// Do something
File file = ...
return file
};
var maxNoOfWorkers = 10;
IEnumerable<Task> tasks = Enumerable.Range(0, maxNoOfWorkers)
.Select(s =>
{
var task = Task.Factory.StartNew<File>(work);
return task.ContinueWith(ant => { /* do soemthing else */ });
});
这种方式 TPL
决定从 threadpool
获得多少线程,如果您真的想创建一个专用的 (non-threadpool
) 线程,您可以使用以下方法这样做:
IEnumerable<Task> tasks = Enumerable.Range(0, maxNoOfWorkers)
.Select(s =>
{
var task = Task.Factory.StartNew<File>(
work,
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
return task.ContinueWith(ant => { /* do soemthing else */ });
});
您的其他选择是使用 PLINQ
或 Paraller.For/ForEach
,您可以使用 MaxDegreeOfParallelism
。
一个PLINQ
例子可以是:
Func<File> work = () => {
// Do something
File file = ...
return file
};
var maxNoOfWorkers = 10;
ParallelEnumerable.Range(0, maxNoOfWorkers)
.WithDegreeOfParallelism(maxNoOfWorkers)
.ForAll(x => {
var file = work();
// Do something with file
});
当然我不知道你的例子的上下文,所以你可能需要根据你的要求进行调整。
一个好的方法是使用 DataFlow API. To use it, you have to install the Microsoft.Tpl.Dataflow Nuget package.
假设您有以下几种下载和处理数据的方法:
public async Task<DownloadResult> DownloadFile(string url)
{
//Asynchronously download the file and return the result of the download.
//You don't need a thread to download the file if you use asynchronous API.
}
public ProcessingResult ProcessDownloadResult(DownloadResult download_result)
{
//Synchronously process the download result and produce a ProcessingResult.
}
假设您有一个要下载的 URL 列表:
List<string> urls = new List<string>();
然后您可以使用 DataFlow API 执行以下操作:
TransformBlock<string,DownloadResult> download_block =
new TransformBlock<string, DownloadResult>(
url => DownloadFile(url),
new ExecutionDataflowBlockOptions
{
//Only 10 asynchronous download operations
//can happen at any point in time.
MaxDegreeOfParallelism = 10
});
TransformBlock<DownloadResult, ProcessingResult> process_block =
new TransformBlock<DownloadResult, ProcessingResult>(
dr => ProcessDownloadResult(dr),
new ExecutionDataflowBlockOptions
{
//We limit the number of CPU intensive operation
//to the number of processors in the system.
MaxDegreeOfParallelism = Environment.ProcessorCount
});
download_block.LinkTo(process_block);
foreach(var url in urls)
{
download_block.Post(url);
}