`使用 Parallel.ForEach 加快文件处理速度,但无法按正确的顺序 return

`Using Parallel.ForEach to speed up processing of file but cant return in correct order

所以我正在尝试使用 Parallel.ForEach 循环来加速我对文件的处理,但我不知道如何让它以有序的方式构建输出。这是我目前的代码:

string[] lines = File.ReadAllLines(fileName);
List<string> list_lines = new List<string>(lines);

Parallel.ForEach(list_lines, async line =>
{
    processedData += await processSingleLine(line);
});

如您所见,它没有任何有序的实现,因为我已经尝试寻找适合我的解决方案的东西,但我还没有找到任何我已经能够接近工作的东西。
所以最好我希望处理每一行,但按照每行发送的相同顺序构建 processedData 变量,但是我确实意识到这可能超出了我当前的技能水平,所以任何建议都会乖一点。

编辑: 在尝试阅读下面的答案后,我尝试了两种方法:

ConcurrentDictionary<int, string> result = new ConcurrentDictionary<int, string>();
Parallel.For(0, list.Length, i =>
{
    // process your data and save to dict
    result[i] = processData(lines[i]);
});

ConcurrentDictionary<int, string> result = new ConcurrentDictionary<int, string>();
for (var i = 0; i < lines.Length; i++)
{
    result[i] = lines[i];
}
Array.Clear(lines,0, lines.Length);
Parallel.ForEach(result, line =>
{
    result[line.Key] = encrypt(line.Value, key);
});

然而,两者似乎都只使用了大约 1 个核心(4 核处理器),占任务管理器总数的 30%,而在我实施排序之前,它在 CPU 上使用了近 80% .

您可以尝试使用 Parallel.For 而不是 Parallel.ForEach。然后你就会有你的行的索引。即:

string[] lines = File.ReadAllLines(fileName);

// use thread safe collection for catching the results in parallel
ConcurrentDictionary<int, Data> result = new ConcurrentDictionary<int, Data>();

Parallel.For(0, list.Length, i =>
{
    // process your data and save to dict
    result[i] = processData(lines[i]);
});

// having data in dict you can easily retrieve initial order
Data[] orderedData = Data[lines.Length];
for(var i=0; i<lines.Length; i++)
{
    orderedData[i] = result[i];
}

编辑: 正如您在问题下的评论中所说,您不能在此处使用异步方法。当你这样做时,Parallel.ForEach 会给你带来一堆任务,而不是结果。如果你想并行化异步代码,你可以使用多个Task.Run,就像这里:

string[] lines = File.ReadAllLines(fileName);

var tasks = lines.Select(
                 l => Task.Run<Data>(
                         async () => {
                              return await processAsync(l);
                         })).ToList();

var results = await Task.WhenAll(tasks);

注意:应该可以,但没有检查。

我相信 Parallel.ForEach.AsOrdered() 会如您所愿。

从您的代码中获取数据结构 list_lines 和方法 processSingleLine,以下应保留顺序并具有并行执行:

var parallelQuery = from line in list_lines.AsParallel().AsOrdered()
                    select processSingleLine(line);

foreach (var processedLine in parallelQuery)
{
    Console.Write(processedLine);
}