处理超大文本文件时突然内存消耗跳跃导致内存不足异常

Sudden memory consumption jump resulting in out of memory exception while processing huge text file

我需要处理一个非常大的文本文件 (6-8 GB)。我写了下面附上的代码。不幸的是,每次输出文件达到(在源文件旁边创建)达到~2GB,我观察到内存消耗突然跳跃(~100MB 到几 GB),结果 - 内存不足异常.

调试器指示 OOM 发生在 while ((tempLine = streamReader.ReadLine()) != null) 我只针对 .NET 4.7 和 x64 架构。 单行最多 50 个字符。

我可以解决这个问题,将原始文件分割成更小的部分,这样在处理时不会遇到问题,最后将结果合并回一个文件,但我不想这样做。

代码:

public async Task PerformDecodeAsync(string sourcePath, string targetPath)
    {
        var allLines = CountLines(sourcePath);
        long processedlines = default;
        using (File.Create(targetPath));
        var streamWriter = File.AppendText(targetPath);
        var decoderBlockingCollection = new BlockingCollection<string>(1000);
        var writerBlockingCollection = new BlockingCollection<string>(1000);

        var producer = Task.Factory.StartNew(() =>
        {
            using (var streamReader = new StreamReader(File.OpenRead(sourcePath), Encoding.Default, true))
            {
                string tempLine;
                while ((tempLine = streamReader.ReadLine()) != null)
                {
                    decoderBlockingCollection.Add(tempLine);
                }
                decoderBlockingCollection.CompleteAdding();

            }
        });
        var consumer1 = Task.Factory.StartNew(() =>
        {
            foreach (var line in decoderBlockingCollection.GetConsumingEnumerable())
            {
                short decodeCounter = 0;
                StringBuilder builder = new StringBuilder();
                foreach (var singleChar in line)
                {

                    var positionInDecodeKey = decodingKeysList[decodeCounter].IndexOf(singleChar);

                    if (positionInDecodeKey > 0)
                        builder.Append(model.Substring(positionInDecodeKey, 1));
                    else
                        builder.Append(singleChar);


                    if (decodeCounter > 18)
                        decodeCounter = 0;
                    else ++decodeCounter;
                }
                writerBlockingCollection.TryAdd(builder.ToString());
                Interlocked.Increment(ref processedlines);
                if (processedlines == (long)allLines)
                    writerBlockingCollection.CompleteAdding();
            }
        });
        var writer = Task.Factory.StartNew(() =>
        {
            foreach (var line in writerBlockingCollection.GetConsumingEnumerable())
            {
                streamWriter.WriteLine(line);
            }
        });
        Task.WaitAll(producer, consumer1, writer);
    }

非常感谢解决方案以及如何进一步优化它的建议。

由于您所做的工作主要受 IO 限制,因此您实际上并没有从并行化中获得任何好处。在我看来(如果我错了请纠正我)你的转换算法不依赖于你逐行阅读文件,所以我建议改为做这样的事情:

void Main()
{
    //Setup streams for testing
    using(var inputStream = new MemoryStream())
    using(var outputStream = new MemoryStream())
    using (var inputWriter = new StreamWriter(inputStream))
    using (var outputReader = new StreamReader(outputStream))
    {
        //Write test string and rewind stream
        inputWriter.Write("abcdefghijklmnop");
        inputWriter.Flush();
        inputStream.Seek(0, SeekOrigin.Begin);

        var inputBuffer = new byte[5];
        var outputBuffer = new byte[5];
        int inputLength;
        while ((inputLength = inputStream.Read(inputBuffer, 0, inputBuffer.Length)) > 0)
        {
            for (var i = 0; i < inputLength; i++)
            {
                //transform each character
                outputBuffer[i] = ++inputBuffer[i];
            }

            //Write to output
            outputStream.Write(outputBuffer, 0, inputLength);
        }

        //Read for testing
        outputStream.Seek(0, SeekOrigin.Begin);
        var output = outputReader.ReadToEnd();
        Console.WriteLine(output);

        //Outputs: "bcdefghijklmnopq"
    }

}

显然,您将使用 FileStreams 而不是 MemoryStreams,并且您可以将缓冲区长度增加到更大的长度(因为这只是一个演示示例)。此外,由于您的原始方法是异步的,因此您使用 Stream.Write 和 Stream.Read

的异步变体

就像我说的,我可能会先选择更简单的东西,除非或直到它被证明表现不佳。正如 Adi 在他们的回答中所说,这项工作似乎是 I/O 绑定的 - 因此为其创建多个任务似乎没有什么好处。

publiv void PerformDecode(string sourcePath, string targetPath)
{
    File.WriteAllLines(targetPath,File.ReadLines(sourcePath).Select(line=>{
        short decodeCounter = 0;
        StringBuilder builder = new StringBuilder();
        foreach (var singleChar in line)
        {
            var positionInDecodeKey = decodingKeysList[decodeCounter].IndexOf(singleChar);
            if (positionInDecodeKey > 0)
                builder.Append(model.Substring(positionInDecodeKey, 1));
            else
                builder.Append(singleChar);

            if (decodeCounter > 18)
                decodeCounter = 0;
            else ++decodeCounter;
        }
        return builder.ToString();
    }));
}

现在,当然,这段代码实际上是阻塞的,直到它完成,这就是我没有标记它的原因 async。但是,你的也一样,它应该已经警告过了。

(您可以尝试对 Select 部分使用 PLINQ 而不是 LINQ,但老实说,我们在这里所做的 processing 的数量看起来微不足道;先进行配置文件应用任何此类更改)