处理超大文本文件时突然内存消耗跳跃导致内存不足异常
Sudden memory consumption jump resulting in out of memory exception while processing huge text file
我需要处理一个非常大的文本文件 (6-8 GB)。我写了下面附上的代码。不幸的是,每次输出文件达到(在源文件旁边创建)达到~2GB,我观察到内存消耗突然跳跃(~100MB 到几 GB),结果 - 内存不足异常.
调试器指示 OOM 发生在 while ((tempLine = streamReader.ReadLine()) != null)
我只针对 .NET 4.7 和 x64 架构。
单行最多 50 个字符。
我可以解决这个问题,将原始文件分割成更小的部分,这样在处理时不会遇到问题,最后将结果合并回一个文件,但我不想这样做。
代码:
public async Task PerformDecodeAsync(string sourcePath, string targetPath)
{
var allLines = CountLines(sourcePath);
long processedlines = default;
using (File.Create(targetPath));
var streamWriter = File.AppendText(targetPath);
var decoderBlockingCollection = new BlockingCollection<string>(1000);
var writerBlockingCollection = new BlockingCollection<string>(1000);
var producer = Task.Factory.StartNew(() =>
{
using (var streamReader = new StreamReader(File.OpenRead(sourcePath), Encoding.Default, true))
{
string tempLine;
while ((tempLine = streamReader.ReadLine()) != null)
{
decoderBlockingCollection.Add(tempLine);
}
decoderBlockingCollection.CompleteAdding();
}
});
var consumer1 = Task.Factory.StartNew(() =>
{
foreach (var line in decoderBlockingCollection.GetConsumingEnumerable())
{
short decodeCounter = 0;
StringBuilder builder = new StringBuilder();
foreach (var singleChar in line)
{
var positionInDecodeKey = decodingKeysList[decodeCounter].IndexOf(singleChar);
if (positionInDecodeKey > 0)
builder.Append(model.Substring(positionInDecodeKey, 1));
else
builder.Append(singleChar);
if (decodeCounter > 18)
decodeCounter = 0;
else ++decodeCounter;
}
writerBlockingCollection.TryAdd(builder.ToString());
Interlocked.Increment(ref processedlines);
if (processedlines == (long)allLines)
writerBlockingCollection.CompleteAdding();
}
});
var writer = Task.Factory.StartNew(() =>
{
foreach (var line in writerBlockingCollection.GetConsumingEnumerable())
{
streamWriter.WriteLine(line);
}
});
Task.WaitAll(producer, consumer1, writer);
}
非常感谢解决方案以及如何进一步优化它的建议。
由于您所做的工作主要受 IO 限制,因此您实际上并没有从并行化中获得任何好处。在我看来(如果我错了请纠正我)你的转换算法不依赖于你逐行阅读文件,所以我建议改为做这样的事情:
void Main()
{
//Setup streams for testing
using(var inputStream = new MemoryStream())
using(var outputStream = new MemoryStream())
using (var inputWriter = new StreamWriter(inputStream))
using (var outputReader = new StreamReader(outputStream))
{
//Write test string and rewind stream
inputWriter.Write("abcdefghijklmnop");
inputWriter.Flush();
inputStream.Seek(0, SeekOrigin.Begin);
var inputBuffer = new byte[5];
var outputBuffer = new byte[5];
int inputLength;
while ((inputLength = inputStream.Read(inputBuffer, 0, inputBuffer.Length)) > 0)
{
for (var i = 0; i < inputLength; i++)
{
//transform each character
outputBuffer[i] = ++inputBuffer[i];
}
//Write to output
outputStream.Write(outputBuffer, 0, inputLength);
}
//Read for testing
outputStream.Seek(0, SeekOrigin.Begin);
var output = outputReader.ReadToEnd();
Console.WriteLine(output);
//Outputs: "bcdefghijklmnopq"
}
}
显然,您将使用 FileStreams 而不是 MemoryStreams,并且您可以将缓冲区长度增加到更大的长度(因为这只是一个演示示例)。此外,由于您的原始方法是异步的,因此您使用 Stream.Write 和 Stream.Read
的异步变体
就像我说的,我可能会先选择更简单的东西,除非或直到它被证明表现不佳。正如 Adi 在他们的回答中所说,这项工作似乎是 I/O 绑定的 - 因此为其创建多个任务似乎没有什么好处。
publiv void PerformDecode(string sourcePath, string targetPath)
{
File.WriteAllLines(targetPath,File.ReadLines(sourcePath).Select(line=>{
short decodeCounter = 0;
StringBuilder builder = new StringBuilder();
foreach (var singleChar in line)
{
var positionInDecodeKey = decodingKeysList[decodeCounter].IndexOf(singleChar);
if (positionInDecodeKey > 0)
builder.Append(model.Substring(positionInDecodeKey, 1));
else
builder.Append(singleChar);
if (decodeCounter > 18)
decodeCounter = 0;
else ++decodeCounter;
}
return builder.ToString();
}));
}
现在,当然,这段代码实际上是阻塞的,直到它完成,这就是我没有标记它的原因 async
。但是,你的也一样,它应该已经警告过了。
(您可以尝试对 Select
部分使用 PLINQ 而不是 LINQ,但老实说,我们在这里所做的 processing 的数量看起来微不足道;先进行配置文件应用任何此类更改)
我需要处理一个非常大的文本文件 (6-8 GB)。我写了下面附上的代码。不幸的是,每次输出文件达到(在源文件旁边创建)达到~2GB,我观察到内存消耗突然跳跃(~100MB 到几 GB),结果 - 内存不足异常.
调试器指示 OOM 发生在 while ((tempLine = streamReader.ReadLine()) != null)
我只针对 .NET 4.7 和 x64 架构。
单行最多 50 个字符。
我可以解决这个问题,将原始文件分割成更小的部分,这样在处理时不会遇到问题,最后将结果合并回一个文件,但我不想这样做。
代码:
public async Task PerformDecodeAsync(string sourcePath, string targetPath)
{
var allLines = CountLines(sourcePath);
long processedlines = default;
using (File.Create(targetPath));
var streamWriter = File.AppendText(targetPath);
var decoderBlockingCollection = new BlockingCollection<string>(1000);
var writerBlockingCollection = new BlockingCollection<string>(1000);
var producer = Task.Factory.StartNew(() =>
{
using (var streamReader = new StreamReader(File.OpenRead(sourcePath), Encoding.Default, true))
{
string tempLine;
while ((tempLine = streamReader.ReadLine()) != null)
{
decoderBlockingCollection.Add(tempLine);
}
decoderBlockingCollection.CompleteAdding();
}
});
var consumer1 = Task.Factory.StartNew(() =>
{
foreach (var line in decoderBlockingCollection.GetConsumingEnumerable())
{
short decodeCounter = 0;
StringBuilder builder = new StringBuilder();
foreach (var singleChar in line)
{
var positionInDecodeKey = decodingKeysList[decodeCounter].IndexOf(singleChar);
if (positionInDecodeKey > 0)
builder.Append(model.Substring(positionInDecodeKey, 1));
else
builder.Append(singleChar);
if (decodeCounter > 18)
decodeCounter = 0;
else ++decodeCounter;
}
writerBlockingCollection.TryAdd(builder.ToString());
Interlocked.Increment(ref processedlines);
if (processedlines == (long)allLines)
writerBlockingCollection.CompleteAdding();
}
});
var writer = Task.Factory.StartNew(() =>
{
foreach (var line in writerBlockingCollection.GetConsumingEnumerable())
{
streamWriter.WriteLine(line);
}
});
Task.WaitAll(producer, consumer1, writer);
}
非常感谢解决方案以及如何进一步优化它的建议。
由于您所做的工作主要受 IO 限制,因此您实际上并没有从并行化中获得任何好处。在我看来(如果我错了请纠正我)你的转换算法不依赖于你逐行阅读文件,所以我建议改为做这样的事情:
void Main()
{
//Setup streams for testing
using(var inputStream = new MemoryStream())
using(var outputStream = new MemoryStream())
using (var inputWriter = new StreamWriter(inputStream))
using (var outputReader = new StreamReader(outputStream))
{
//Write test string and rewind stream
inputWriter.Write("abcdefghijklmnop");
inputWriter.Flush();
inputStream.Seek(0, SeekOrigin.Begin);
var inputBuffer = new byte[5];
var outputBuffer = new byte[5];
int inputLength;
while ((inputLength = inputStream.Read(inputBuffer, 0, inputBuffer.Length)) > 0)
{
for (var i = 0; i < inputLength; i++)
{
//transform each character
outputBuffer[i] = ++inputBuffer[i];
}
//Write to output
outputStream.Write(outputBuffer, 0, inputLength);
}
//Read for testing
outputStream.Seek(0, SeekOrigin.Begin);
var output = outputReader.ReadToEnd();
Console.WriteLine(output);
//Outputs: "bcdefghijklmnopq"
}
}
显然,您将使用 FileStreams 而不是 MemoryStreams,并且您可以将缓冲区长度增加到更大的长度(因为这只是一个演示示例)。此外,由于您的原始方法是异步的,因此您使用 Stream.Write 和 Stream.Read
的异步变体就像我说的,我可能会先选择更简单的东西,除非或直到它被证明表现不佳。正如 Adi 在他们的回答中所说,这项工作似乎是 I/O 绑定的 - 因此为其创建多个任务似乎没有什么好处。
publiv void PerformDecode(string sourcePath, string targetPath)
{
File.WriteAllLines(targetPath,File.ReadLines(sourcePath).Select(line=>{
short decodeCounter = 0;
StringBuilder builder = new StringBuilder();
foreach (var singleChar in line)
{
var positionInDecodeKey = decodingKeysList[decodeCounter].IndexOf(singleChar);
if (positionInDecodeKey > 0)
builder.Append(model.Substring(positionInDecodeKey, 1));
else
builder.Append(singleChar);
if (decodeCounter > 18)
decodeCounter = 0;
else ++decodeCounter;
}
return builder.ToString();
}));
}
现在,当然,这段代码实际上是阻塞的,直到它完成,这就是我没有标记它的原因 async
。但是,你的也一样,它应该已经警告过了。
(您可以尝试对 Select
部分使用 PLINQ 而不是 LINQ,但老实说,我们在这里所做的 processing 的数量看起来微不足道;先进行配置文件应用任何此类更改)