`使用 Parallel.ForEach 加快文件处理速度,但无法按正确的顺序 return
`Using Parallel.ForEach to speed up processing of file but cant return in correct order
所以我正在尝试使用 Parallel.ForEach
循环来加速我对文件的处理,但我不知道如何让它以有序的方式构建输出。这是我目前的代码:
string[] lines = File.ReadAllLines(fileName);
List<string> list_lines = new List<string>(lines);
Parallel.ForEach(list_lines, async line =>
{
processedData += await processSingleLine(line);
});
如您所见,它没有任何有序的实现,因为我已经尝试寻找适合我的解决方案的东西,但我还没有找到任何我已经能够接近工作的东西。
所以最好我希望处理每一行,但按照每行发送的相同顺序构建 processedData
变量,但是我确实意识到这可能超出了我当前的技能水平,所以任何建议都会乖一点。
编辑:
在尝试阅读下面的答案后,我尝试了两种方法:
ConcurrentDictionary<int, string> result = new ConcurrentDictionary<int, string>();
Parallel.For(0, list.Length, i =>
{
// process your data and save to dict
result[i] = processData(lines[i]);
});
和
ConcurrentDictionary<int, string> result = new ConcurrentDictionary<int, string>();
for (var i = 0; i < lines.Length; i++)
{
result[i] = lines[i];
}
Array.Clear(lines,0, lines.Length);
Parallel.ForEach(result, line =>
{
result[line.Key] = encrypt(line.Value, key);
});
然而,两者似乎都只使用了大约 1 个核心(4 核处理器),占任务管理器总数的 30%,而在我实施排序之前,它在 CPU 上使用了近 80% .
您可以尝试使用 Parallel.For
而不是 Parallel.ForEach
。然后你就会有你的行的索引。即:
string[] lines = File.ReadAllLines(fileName);
// use thread safe collection for catching the results in parallel
ConcurrentDictionary<int, Data> result = new ConcurrentDictionary<int, Data>();
Parallel.For(0, list.Length, i =>
{
// process your data and save to dict
result[i] = processData(lines[i]);
});
// having data in dict you can easily retrieve initial order
Data[] orderedData = Data[lines.Length];
for(var i=0; i<lines.Length; i++)
{
orderedData[i] = result[i];
}
编辑: 正如您在问题下的评论中所说,您不能在此处使用异步方法。当你这样做时,Parallel.ForEach
会给你带来一堆任务,而不是结果。如果你想并行化异步代码,你可以使用多个Task.Run
,就像这里:
string[] lines = File.ReadAllLines(fileName);
var tasks = lines.Select(
l => Task.Run<Data>(
async () => {
return await processAsync(l);
})).ToList();
var results = await Task.WhenAll(tasks);
注意:应该可以,但没有检查。
我相信 Parallel.ForEach.AsOrdered() 会如您所愿。
从您的代码中获取数据结构 list_lines 和方法 processSingleLine,以下应保留顺序并具有并行执行:
var parallelQuery = from line in list_lines.AsParallel().AsOrdered()
select processSingleLine(line);
foreach (var processedLine in parallelQuery)
{
Console.Write(processedLine);
}
所以我正在尝试使用 Parallel.ForEach
循环来加速我对文件的处理,但我不知道如何让它以有序的方式构建输出。这是我目前的代码:
string[] lines = File.ReadAllLines(fileName);
List<string> list_lines = new List<string>(lines);
Parallel.ForEach(list_lines, async line =>
{
processedData += await processSingleLine(line);
});
如您所见,它没有任何有序的实现,因为我已经尝试寻找适合我的解决方案的东西,但我还没有找到任何我已经能够接近工作的东西。
所以最好我希望处理每一行,但按照每行发送的相同顺序构建 processedData
变量,但是我确实意识到这可能超出了我当前的技能水平,所以任何建议都会乖一点。
编辑: 在尝试阅读下面的答案后,我尝试了两种方法:
ConcurrentDictionary<int, string> result = new ConcurrentDictionary<int, string>();
Parallel.For(0, list.Length, i =>
{
// process your data and save to dict
result[i] = processData(lines[i]);
});
和
ConcurrentDictionary<int, string> result = new ConcurrentDictionary<int, string>();
for (var i = 0; i < lines.Length; i++)
{
result[i] = lines[i];
}
Array.Clear(lines,0, lines.Length);
Parallel.ForEach(result, line =>
{
result[line.Key] = encrypt(line.Value, key);
});
然而,两者似乎都只使用了大约 1 个核心(4 核处理器),占任务管理器总数的 30%,而在我实施排序之前,它在 CPU 上使用了近 80% .
您可以尝试使用 Parallel.For
而不是 Parallel.ForEach
。然后你就会有你的行的索引。即:
string[] lines = File.ReadAllLines(fileName);
// use thread safe collection for catching the results in parallel
ConcurrentDictionary<int, Data> result = new ConcurrentDictionary<int, Data>();
Parallel.For(0, list.Length, i =>
{
// process your data and save to dict
result[i] = processData(lines[i]);
});
// having data in dict you can easily retrieve initial order
Data[] orderedData = Data[lines.Length];
for(var i=0; i<lines.Length; i++)
{
orderedData[i] = result[i];
}
编辑: 正如您在问题下的评论中所说,您不能在此处使用异步方法。当你这样做时,Parallel.ForEach
会给你带来一堆任务,而不是结果。如果你想并行化异步代码,你可以使用多个Task.Run
,就像这里:
string[] lines = File.ReadAllLines(fileName);
var tasks = lines.Select(
l => Task.Run<Data>(
async () => {
return await processAsync(l);
})).ToList();
var results = await Task.WhenAll(tasks);
注意:应该可以,但没有检查。
我相信 Parallel.ForEach.AsOrdered() 会如您所愿。
从您的代码中获取数据结构 list_lines 和方法 processSingleLine,以下应保留顺序并具有并行执行:
var parallelQuery = from line in list_lines.AsParallel().AsOrdered()
select processSingleLine(line);
foreach (var processedLine in parallelQuery)
{
Console.Write(processedLine);
}