使用 C# 读取数百万个小文件
Reading millions of small files with C#
我有数百万个每天生成的日志文件,我需要阅读所有这些文件并将它们放在一起作为一个文件,以便在其他应用程序中对其进行一些处理。
我正在寻找最快的方法。目前我正在使用这样的线程、任务和并行:
Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
ReadFiles(files[i]);
});
void ReadFiles(string file)
{
try
{
var txt = File.ReadAllText(file);
filesTxt.Add(tmp);
}
catch { }
GlobalCls.ThreadNo--;
}
或
foreach (var file in files)
{
//Int64 index = i;
//var file = files[index];
while (Process.GetCurrentProcess().Threads.Count > 100)
{
Thread.Sleep(100);
Application.DoEvents();
}
new Thread(() => ReadFiles(file)).Start();
GlobalCls.ThreadNo++;
// Task.Run(() => ReadFiles(file));
}
问题是读取几千个文件后,读取越来越慢!!
知道为什么吗?读取数百万个小文件的最快方法是什么?谢谢。
说到IO操作,CPU并行就没用了。您的 IO 设备(磁盘、网络等)是您的瓶颈。通过同时读取设备,您可能会降低性能。
也许您可以只使用 PowerShell 来连接文件,例如 this answer。
另一种方法是编写一个程序,使用 FileSystemWatcher class 来监视新文件并在创建文件时附加它们。
您似乎正在加载内存中所有文件的内容,然后再将它们写回单个文件。这可以解释为什么这个过程会随着时间变慢。
优化流程的一种方法是将读取部分与写入部分分开,并并行进行。这称为生产者消费者模式。它可以用 Parallel
class 或线程或任务来实现,但我将演示一个基于强大的 TPL Dataflow library 的实现,它特别适合这样的工作.
private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,
string targetFilePath, CancellationToken cancellationToken = default,
IProgress<int> progress = null)
{
var readerBlock = new TransformBlock<string, string>(async filePath =>
{
return File.ReadAllText(filePath); // Read the small file
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2, // Reading is parallelizable
BoundedCapacity = 100, // No more than 100 file-paths buffered
CancellationToken = cancellationToken, // Cancel at any time
});
StreamWriter streamWriter = null;
int filesProcessed = 0;
var writerBlock = new ActionBlock<string>(text =>
{
streamWriter.Write(text); // Append to the target file
filesProcessed++;
if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1, // We can't parallelize the writer
BoundedCapacity = 100, // No more than 100 file-contents buffered
CancellationToken = cancellationToken, // Cancel at any time
});
readerBlock.LinkTo(writerBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
// This is a tricky part. We use BoundedCapacity, so we must propagate manually
// a possible failure of the writer to the reader, otherwise a deadlock may occur.
PropagateFailure(writerBlock, readerBlock);
// Open the output stream
using (streamWriter = new StreamWriter(targetFilePath))
{
// Feed the reader with the file paths
foreach (var filePath in sourceFilePaths)
{
var accepted = await readerBlock.SendAsync(filePath,
cancellationToken); // Cancel at any time
if (!accepted) break; // This will happen if the reader fails
}
readerBlock.Complete();
await writerBlock.Completion;
}
async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
{
try { await block1.Completion.ConfigureAwait(false); }
catch (Exception ex)
{
if (block1.Completion.IsCanceled) return; // On cancellation do nothing
block2.Fault(ex);
}
}
}
用法示例:
var cts = new CancellationTokenSource();
var progress = new Progress<int>(value =>
{
// Safe to update the UI
Console.WriteLine($"Files processed: {value:#,0}");
});
var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",
SearchOption.AllDirectories); // Include subdirectories
await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);
BoundedCapacity
用于控制内存使用。
如果磁盘驱动器是 SSD,您可以尝试使用大于 2 的 MaxDegreeOfParallelism
读取。
为了获得最佳性能,您可以考虑写入与包含源文件的驱动器不同的磁盘驱动器。
TPL 数据流库可用作 .NET Framework 的 a package,并且内置于 .NET Core。
我有数百万个每天生成的日志文件,我需要阅读所有这些文件并将它们放在一起作为一个文件,以便在其他应用程序中对其进行一些处理。
我正在寻找最快的方法。目前我正在使用这样的线程、任务和并行:
Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
ReadFiles(files[i]);
});
void ReadFiles(string file)
{
try
{
var txt = File.ReadAllText(file);
filesTxt.Add(tmp);
}
catch { }
GlobalCls.ThreadNo--;
}
或
foreach (var file in files)
{
//Int64 index = i;
//var file = files[index];
while (Process.GetCurrentProcess().Threads.Count > 100)
{
Thread.Sleep(100);
Application.DoEvents();
}
new Thread(() => ReadFiles(file)).Start();
GlobalCls.ThreadNo++;
// Task.Run(() => ReadFiles(file));
}
问题是读取几千个文件后,读取越来越慢!!
知道为什么吗?读取数百万个小文件的最快方法是什么?谢谢。
说到IO操作,CPU并行就没用了。您的 IO 设备(磁盘、网络等)是您的瓶颈。通过同时读取设备,您可能会降低性能。
也许您可以只使用 PowerShell 来连接文件,例如 this answer。
另一种方法是编写一个程序,使用 FileSystemWatcher class 来监视新文件并在创建文件时附加它们。
您似乎正在加载内存中所有文件的内容,然后再将它们写回单个文件。这可以解释为什么这个过程会随着时间变慢。
优化流程的一种方法是将读取部分与写入部分分开,并并行进行。这称为生产者消费者模式。它可以用 Parallel
class 或线程或任务来实现,但我将演示一个基于强大的 TPL Dataflow library 的实现,它特别适合这样的工作.
private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,
string targetFilePath, CancellationToken cancellationToken = default,
IProgress<int> progress = null)
{
var readerBlock = new TransformBlock<string, string>(async filePath =>
{
return File.ReadAllText(filePath); // Read the small file
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2, // Reading is parallelizable
BoundedCapacity = 100, // No more than 100 file-paths buffered
CancellationToken = cancellationToken, // Cancel at any time
});
StreamWriter streamWriter = null;
int filesProcessed = 0;
var writerBlock = new ActionBlock<string>(text =>
{
streamWriter.Write(text); // Append to the target file
filesProcessed++;
if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1, // We can't parallelize the writer
BoundedCapacity = 100, // No more than 100 file-contents buffered
CancellationToken = cancellationToken, // Cancel at any time
});
readerBlock.LinkTo(writerBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
// This is a tricky part. We use BoundedCapacity, so we must propagate manually
// a possible failure of the writer to the reader, otherwise a deadlock may occur.
PropagateFailure(writerBlock, readerBlock);
// Open the output stream
using (streamWriter = new StreamWriter(targetFilePath))
{
// Feed the reader with the file paths
foreach (var filePath in sourceFilePaths)
{
var accepted = await readerBlock.SendAsync(filePath,
cancellationToken); // Cancel at any time
if (!accepted) break; // This will happen if the reader fails
}
readerBlock.Complete();
await writerBlock.Completion;
}
async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
{
try { await block1.Completion.ConfigureAwait(false); }
catch (Exception ex)
{
if (block1.Completion.IsCanceled) return; // On cancellation do nothing
block2.Fault(ex);
}
}
}
用法示例:
var cts = new CancellationTokenSource();
var progress = new Progress<int>(value =>
{
// Safe to update the UI
Console.WriteLine($"Files processed: {value:#,0}");
});
var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",
SearchOption.AllDirectories); // Include subdirectories
await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);
BoundedCapacity
用于控制内存使用。
如果磁盘驱动器是 SSD,您可以尝试使用大于 2 的 MaxDegreeOfParallelism
读取。
为了获得最佳性能,您可以考虑写入与包含源文件的驱动器不同的磁盘驱动器。
TPL 数据流库可用作 .NET Framework 的 a package,并且内置于 .NET Core。