C# 中的异步文件 I/O 开销

Async file I/O overhead in C#

我遇到了一个问题,我必须处理一大批大型 jsonl 文件(读取、反序列化、进行一些转换数据库查找等,然后将转换后的结果写入 .net 核心控制台应用程序。

我通过将输出分批放在单独的线程上获得了更好的吞吐量,并试图通过添加一些并行性来改进处理方面,但开销最终弄巧成拙。

我一直在做:

using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
    for (;;)
    {
        var l = reader.ReadLine();
        if (l == null)
            break;
        // Deserialize
        // Do some database lookups
        // Do some transforms
        // Pass result to output thread
    }
}

一些诊断时间告诉我 ReadLine() 调用比反序列化等花费的时间更多。为了给它加上一些数字,一个大文件大约有:

我想将那 11 秒的文件 i/o 与其他作品重叠,所以我尝试了

using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
    var nextLine = reader.ReadLineAsync();
    for (;;)
    {
        var l = nextLine.Result;
        if (l == null)
            break;
        nextLine = reader.ReadLineAsync();
        // Deserialize
        // Do some database lookups
        // Do some transforms
        // Pass result to output thread
    }
}

为了让下一个 I/O 在我做转换的时候继续进行。只是最终花费的时间比常规同步的时间长得多(比如两倍的时间)。

我有要求他们希望整体结果具有可预测性(即必须按名称顺序处理同一组文件,并且输出行必须按可预测的相同顺序进行处理)所以我不能只需为每个线程抛出一个文件,然后让他们解决问题。

我只是想引入足够的并行性来平滑大量输入的吞吐量,我很惊讶上述结果适得其反。

我是不是遗漏了什么?

11 seconds spent on ReadLine

更具体地说,花在文件 I/O 上的 11 秒,但您没有测量。

将您创建的流替换为:

using var reader = new StreamReader(_filePath, Encoding.UTF8, false, 50 * 1024 * 1024);

这将导致它读取到 50MB 的缓冲区(根据需要调整大小)以避免在看似古老的硬盘驱动器上重复 I/O。

I was just trying to introduce enough parallelism to smooth the throughput

你不仅根本没有引入任何并行性,而且你用错了 ReadLineAsync -- 它 returns 是 Task<string>,而不是 string

这完全是矫枉过正,增加缓冲区大小很可能会解决您的问题,但如果您想真正做到这一点,您需要两个通过共享数据结构进行通信的线程,正如彼得所说。

Only that ended up taking a lot longer than the regular sync stuff

让我感到困惑的是人们认为多线程代码应该比单线程代码占用更少的处理能力。当今的教育必须缺少一些真正基本的理解才能导致这一点。多线程包括多个额外的上下文切换、互斥锁争用、您的 OS 调度程序启动以替换您的线程之一(导致饥饿或过饱和)、在工作完成后收集、序列化和聚合结果等。None 其中免费或易于实施。

内置异步文件系统 API , and you are advised to avoid them. Not only they are much slower than their synchronous counterparts, but they are not even truly asynchronous. The .NET 6 will come with an improved FileStream 实现,因此在几个月后这可能不再是问题。

您要实现的目标称为任务并行性,其中两个或多个异构操作 运行 并发且彼此独立。这是一项先进的技术,需要专门的工具。最常见的并行类型是所谓的数据并行,其中相同类型的操作在同类数据列表上并行 运行,通常使用 Parallel class 或 PLINQ 库。

要实现任务并行,最容易获得的工具是最后一个块的 TPL Dataflow library, which is built-in the .NET Core / .NET 5 platforms, and you only need to install a package if you are targeting the .NET Framework. This library allows you to create a pipeline consisting of linked components that are called "blocks" (TransformBlock, ActionBlock, BatchBlock etc), where each block acts as an independent processor with its own input and output queues. You feed the pipeline with data, and the data flows from block to block through the pipeline, while being processed along the way. You Complete the first block in the pipeline to signal that no more input data will ever be available, and then await the Completion,它使您的代码等待所有工作完成。这是一个例子:

private async void Button1_Click(object sender, EventArgs e)
{
    Button1.Enabled = false;
    var fileBlock = new TransformManyBlock<string, IList<string>>(filePath =>
    {
        return File.ReadLines(filePath).Buffer(10);
    });

    var deserializeBlock = new TransformBlock<IList<string>, MyObject[]>(lines =>
    {
        return lines.Select(line => Deserialize(line)).ToArray();
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2 // Let's assume that Deserialize is parallelizable
    });

    var persistBlock = new TransformBlock<MyObject[], MyObject[]>(async objects =>
    {
        foreach (MyObject obj in objects) await PersistToDbAsync(obj);
        return objects;
    });

    var displayBlock = new ActionBlock<MyObject[]>(objects =>
    {
        foreach (MyObject obj in objects) TextBox1.AppendText($"{obj}\r\n");
    }, new ExecutionDataflowBlockOptions()
    {
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
        // Make sure that the delegate will be invoked on the UI thread
    });

    fileBlock.LinkTo(deserializeBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
    deserializeBlock.LinkTo(persistBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
    persistBlock.LinkTo(displayBlock,
        new DataflowLinkOptions { PropagateCompletion = true });

    foreach (var filePath in Directory.GetFiles(@"C:\Data"))
        await fileBlock.SendAsync(filePath);

    fileBlock.Complete();
    await displayBlock.Completion;
    MessageBox.Show("Done");
    Button1.Enabled = true;
}

通过管道传递的数据应该是块状的。如果每个工作单元都太轻量级,你应该在数组或列表中对它们进行批处理,否则移动大量微小数据的开销将超过并行的好处。这就是使用 Buffer LINQ operator (from the System.Interactive package) in the above example. The .NET 6 will come with a new Chunk LINQ 运算符提供相同功能的原因。

Theodor 的建议看起来是一个非常强大和有用的库,值得一试,但如果您正在寻找更小的 DIY 解决方案,我会采用以下方法:

using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace Parallelism
{
    class Program
    {
        private static Queue<string> _queue = new Queue<string>();
        private static Task _lastProcessTask;
        
        static async Task Main(string[] args)
        {
            string path = "???";
            await ReadAndProcessAsync(path);
        }

        private static async Task ReadAndProcessAsync(string path)
        {
            using (var str = File.OpenRead(path))
            using (var sr = new StreamReader(str))
            {
                string line = null;
                while (true)
                {
                    line = await sr.ReadLineAsync();
                    if (line == null)
                        break;

                    lock (_queue)
                    {
                        _queue.Enqueue(line);
                        if (_queue.Count == 1)
                            // There was nothing in the queue before
                            // so initiate a new processing loop. Save 
                            // but DON'T await the Task yet.
                            _lastProcessTask = ProcessQueueAsync();
                    }
                }                
            }

            // Now that file reading is completed, await 
            // _lastProcessTask to ensure we don't return
            // before it's finished.
            await _lastProcessTask;
        }

        // This will continue processing as long as lines are in the queue,
        // including new lines entering the queue while processing earlier ones.
        private static Task ProcessQueueAsync()
        {
            return Task.Run(async () =>
            {
                while (true)
                {
                    string line;
                    lock (_queue)
                    {              
                        // Only peak at first so the read loop doesn't think
                        // the queue is empty and initiate a second processing
                        // loop while we're processing this line.
                        if (!_queue.TryPeek(out line))
                            return;
                    }
                    await ProcessLineAsync(line);
                    lock (_queue)
                    {
                        // Dequeues the item we just processed. If it's the last
                        // one, this loop is done.
                        _queue.Dequeue();
                        if (_queue.Count == 0)
                            return;
                    }
                }
            });
        }

        private static async Task ProcessLineAsync(string line)
        {
            // do something
        }
    }
}

请注意,此方法有一个处理循环,当队列中没有剩余时终止,并在新项目准备就绪时根据需要重新启动。另一种方法是有一个连续的处理循环,在队列为空时重复重新检查并执行 Task.Delay() 一小段时间。我更喜欢我的方法,因为它不会因定期和不必要的检查而使工作线程陷入困境,但性能可能会有不明显的不同。

另外,为了评论 Blindy 的回答,我不同意在这里劝阻使用并行性。首先,如今大多数 CPU 都是多核的,因此当 运行 在多核 CPU 上时,明智地使用 .NET 线程池实际上可以最大限度地提高应用程序的效率,并且具有在单核场景中的缺点非常小。

不过,更重要的是,异步 不等于 多线程。异步编程早在多线程之前就存在了,I/O 是最著名的例子。 I/O 操作大部分由硬件处理 除了 CPU - NIC、SATA 控制器等。它们使用一个古老的概念,称为 Hardware Interrupt 今天的大多数编码人员可能从未听说过它,它比多线程早几十年。它基本上只是一种在 off-CPU 操作完成时为 CPU 提供回调以执行的方法。因此,当您使用行为良好的异步 API(尽管 .NET FileStream 存在 Theodore 提到的问题)时,您的 CPU 实际上根本不应该做那么多工作。当你 await 这样的 API 时,CPU 基本上处于空闲状态,直到机器中的其他硬件将请求的数据写入 RAM。

我同意 Blindy 的观点,如果计算机科学程序能够更好地教人们计算机硬件的实际工作原理,那就更好了。希望利用 CPU 可以在等待从磁盘、网络等处读取数据的同时做其他事情这一事实,用柯克船长的话来说,就是“军官思维” .