使用任务计划程序的应用程序很快就会耗尽内存
App that uses Task Scheduler quickly runs out of memory
应用程序解析某个目录中的文件,同时将新文件添加到目录中。我使用 ConcurrentQueue 并尝试将工作分配给核心数。因此,如果有文件要处理 - 它应该同时处理最多 4 个(核心)文件。
然而,在处理 10-30 个文件后,该应用程序会在几秒钟内运行 OOM。我看到内存消耗迅速增长到 ~1.5GB,然后出现 OOM 错误。
我是任务调度程序,所以我可能做错了什么。
文件解析由文件上的 运行 一些 .exe 完成,它使用 <5mb 或 ram。
任务调度程序在每次计时器线程结束时运行。但是它甚至在计时器第二次超时之前就运行了 OOM。
private void OnTimedEvent(object source, ElapsedEventArgs e)
{
DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
FileInfo[] allSrcFiles = info.GetFiles("*.dat").OrderBy(p => p.CreationTime).ToArray();
var validSrcFiles = allSrcFiles.Where(p => (DateTime.Now - p.CreationTime) > TimeSpan.FromSeconds(60));
var newFilesToParse = validSrcFiles.Where(f => !ProcessedFiles.Contains(f.Name));
if (newFilesToParse.Any()) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
foreach (var file in newFilesToParse)
{
FilesToParseQueue.Enqueue(file);
ProcessedFiles.Add(file.Name);
}
if (!busy)
{
if (FilesToParseQueue.Any())
{
busy = true;
Console.WriteLine("");
Console.WriteLine("There are " + FilesToParseQueue.Count + " files in queue. Processing...");
}
var scheduler = new LimitedConcurrencyLevelTaskScheduler(coresCount); //4
TaskFactory factory = new TaskFactory(scheduler);
while (FilesToParseQueue.Any())
{
factory.StartNew(() =>
{
FileInfo file;
if (FilesToParseQueue.TryDequeue(out file))
{
//Dequeue();
ParseFile(file);
}
});
}
if (!FilesToParseQueue.Any())
{
busy = false;
Console.WriteLine("Finished processing Files in the Queue. Waiting for new files...");
}
}
}
只要有文件要处理,您的代码就会不断创建新的 Task
,而且它的处理速度要快得多,以至于可以处理文件。但它没有其他限制(如目录中的文件数),这就是它很快耗尽内存的原因。
一个简单的解决方法是将出列移到循环外:
while (true)
{
FileInfo file;
if (FilesToParseQueue.TryDequeue(out file))
{
factory.StartNew(() => ParseFile(file));
}
else
{
break;
}
}
如果您只为每个内核创建一个 Task
并在这些 Task
中使用循环处理文件,您将获得更好的性能。
基本解
您实际上可以通过将代码剥离到最基本的部分来修复您的代码,如下所示:
// This is technically a misnomer. It should be
// called "FileNamesQueuedForProcessing" or similar.
// Non-thread-safe. Assuming timer callback access only.
private readonly HashSet<string> ProcessedFiles = new HashSet<string>();
private readonly LimitedConcurrencyLevelTaskScheduler LimitedConcurrencyScheduler = new LimitedConcurrencyLevelTaskScheduler(Environment.ProcessorCount);
private void OnTimedEvent(object source, ElapsedEventArgs e)
{
DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
// Slightly rewritten to cut down on allocations.
FileInfo[] newFilesToParse = info
.GetFiles("*.dat")
.Where(f =>
(DateTime.Now - f.CreationTime) > TimeSpan.FromSeconds(60) && // I'd consider removing this filter.
!ProcessedFiles.Contains(f.Name))
.OrderBy(p => p.CreationTime)
.ToArray();
if (newFilesToParse.Length != 0) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
foreach (FileInfo file in newFilesToParse)
{
// Fire and forget.
// You can add the resulting task to a shared thread-safe collection
// if you want to observe completion/exceptions/cancellations.
Task.Factory.StartNew(
() => ParseFile(file)
, CancellationToken.None
, TaskCreationOptions.DenyChildAttach
, LimitedConcurrencyScheduler
);
ProcessedFiles.Add(file.Name);
}
}
请注意,我并没有自己进行任何类型的负载平衡,而是依靠 LimitedConcurrencyLevelTaskScheduler
来执行所宣传的 - 也就是说,在 Task.Factory.StartNew
上立即接受所有工作项目,将它们排队在内部处理它们 在将来的某个时候 最多 [N = 最大并行度] 个线程池线程。
P.S。我假设 OnTimedEvent
将始终在同一个线程上触发。如果不是,则需要进行小的更改以确保线程安全:
private void OnTimedEvent(object source, ElapsedEventArgs e)
{
lock (ProcessedFiles)
{
// As above.
}
}
备选方案
现在,这里有一个稍微更新颖的方法:我们如何摆脱计时器和 LimitedConcurrencyLevelTaskScheduler
并将 所有 处理封装在一个单一的模块化管道中?会有 lot 的阻塞代码(除非你打破 TPL 数据流 - 但我会在这里坚持基本 Class 库类型),但阶段之间的消息传递是 如此 很容易就可以做出真正吸引人的设计(当然是我认为的)。
private async Task PipelineAsync()
{
const int MAX_FILES_TO_BE_QUEUED = 16;
using (BlockingCollection<FileInfo> queue = new BlockingCollection<FileInfo>(boundedCapacity: MAX_FILES_TO_BE_QUEUED))
{
Task producer = Task.Run(async () =>
{
try
{
while (true)
{
DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
HashSet<string> namesOfFilesQeueuedForProcessing = new HashSet<string>();
FileInfo[] newFilesToParse = info
.GetFiles("*.dat")
.Where(f =>
(DateTime.Now - f.CreationTime) > TimeSpan.FromSeconds(60) &&
!ProcessedFiles.Contains(f.Name))
.OrderBy(p => p.CreationTime) // Processing order is not guaranteed.
.ToArray();
foreach (FileInfo file in newFilesToParse)
{
// This will block if we reach bounded capacity thereby throttling
// the producer (meaning we'll never overflow the handover collection).
queue.Add(file);
namesOfFilesQeueuedForProcessing.Add(file.Name);
}
await Task.Delay(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
}
finally
{
// Exception? Cancellation? We'll let the
// consumer know that it can wind down.
queue.CompleteAdding();
}
});
Task consumer = Task.Run(() =>
{
ParallelOptions options = new ParallelOptions {
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(queue.GetConsumingEnumerable(), options, file => ParseFile(file));
});
await Task.WhenAll(producer, consumer).ConfigureAwait(false);
}
}
Stephen Toub 的 "Patterns of Parallel Programming" 第 55 页描述了这种模式的一般形式。我强烈建议您看一看。
这里的权衡是您将因使用 BlockingCollection<T>
和 Parallel.ForEach
而进行的阻塞量。管道作为一个概念的好处很多:新阶段(Task
实例)易于添加,完成和取消易于连接,生产者和消费者异常都被观察到,所有可变状态都令人愉快本地。
这类问题(您 排队 多个工作单元,并希望它们在 并行 中处理)非常适合TPL Dataflow:
private async void OnTimedEvent(object source, ElapsedEventArgs e)
{
DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
FileInfo[] allSrcFiles = info.GetFiles("*.dat").OrderBy(p => p.CreationTime).ToArray();
var validSrcFiles = allSrcFiles.Where(p => (DateTime.Now - p.CreationTime) > TimeSpan.FromSeconds(60));
var newFilesToParse = validSrcFiles.Where(f => !ProcessedFiles.Contains(f.Name));
if (newFilesToParse.Any()) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
var blockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = coresCount,
};
var block = new ActionBlock<FileInfo>(ParseFile, blockOptions);
var filesToParseCount = 0;
foreach (var file in newFilesToParse)
{
block.Post(file);
ProcessedFiles.Add(file.Name);
++filesToParseCount;
}
Console.WriteLine("There are " + filesToParseCount + " files in queue. Processing...");
block.Complete();
await block.Completion;
Console.WriteLine("Finished processing Files in the Queue. Waiting for new files...");
}
应用程序解析某个目录中的文件,同时将新文件添加到目录中。我使用 ConcurrentQueue 并尝试将工作分配给核心数。因此,如果有文件要处理 - 它应该同时处理最多 4 个(核心)文件。 然而,在处理 10-30 个文件后,该应用程序会在几秒钟内运行 OOM。我看到内存消耗迅速增长到 ~1.5GB,然后出现 OOM 错误。 我是任务调度程序,所以我可能做错了什么。 文件解析由文件上的 运行 一些 .exe 完成,它使用 <5mb 或 ram。 任务调度程序在每次计时器线程结束时运行。但是它甚至在计时器第二次超时之前就运行了 OOM。
private void OnTimedEvent(object source, ElapsedEventArgs e)
{
DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
FileInfo[] allSrcFiles = info.GetFiles("*.dat").OrderBy(p => p.CreationTime).ToArray();
var validSrcFiles = allSrcFiles.Where(p => (DateTime.Now - p.CreationTime) > TimeSpan.FromSeconds(60));
var newFilesToParse = validSrcFiles.Where(f => !ProcessedFiles.Contains(f.Name));
if (newFilesToParse.Any()) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
foreach (var file in newFilesToParse)
{
FilesToParseQueue.Enqueue(file);
ProcessedFiles.Add(file.Name);
}
if (!busy)
{
if (FilesToParseQueue.Any())
{
busy = true;
Console.WriteLine("");
Console.WriteLine("There are " + FilesToParseQueue.Count + " files in queue. Processing...");
}
var scheduler = new LimitedConcurrencyLevelTaskScheduler(coresCount); //4
TaskFactory factory = new TaskFactory(scheduler);
while (FilesToParseQueue.Any())
{
factory.StartNew(() =>
{
FileInfo file;
if (FilesToParseQueue.TryDequeue(out file))
{
//Dequeue();
ParseFile(file);
}
});
}
if (!FilesToParseQueue.Any())
{
busy = false;
Console.WriteLine("Finished processing Files in the Queue. Waiting for new files...");
}
}
}
只要有文件要处理,您的代码就会不断创建新的 Task
,而且它的处理速度要快得多,以至于可以处理文件。但它没有其他限制(如目录中的文件数),这就是它很快耗尽内存的原因。
一个简单的解决方法是将出列移到循环外:
while (true)
{
FileInfo file;
if (FilesToParseQueue.TryDequeue(out file))
{
factory.StartNew(() => ParseFile(file));
}
else
{
break;
}
}
如果您只为每个内核创建一个 Task
并在这些 Task
中使用循环处理文件,您将获得更好的性能。
基本解
您实际上可以通过将代码剥离到最基本的部分来修复您的代码,如下所示:
// This is technically a misnomer. It should be
// called "FileNamesQueuedForProcessing" or similar.
// Non-thread-safe. Assuming timer callback access only.
private readonly HashSet<string> ProcessedFiles = new HashSet<string>();
private readonly LimitedConcurrencyLevelTaskScheduler LimitedConcurrencyScheduler = new LimitedConcurrencyLevelTaskScheduler(Environment.ProcessorCount);
private void OnTimedEvent(object source, ElapsedEventArgs e)
{
DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
// Slightly rewritten to cut down on allocations.
FileInfo[] newFilesToParse = info
.GetFiles("*.dat")
.Where(f =>
(DateTime.Now - f.CreationTime) > TimeSpan.FromSeconds(60) && // I'd consider removing this filter.
!ProcessedFiles.Contains(f.Name))
.OrderBy(p => p.CreationTime)
.ToArray();
if (newFilesToParse.Length != 0) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
foreach (FileInfo file in newFilesToParse)
{
// Fire and forget.
// You can add the resulting task to a shared thread-safe collection
// if you want to observe completion/exceptions/cancellations.
Task.Factory.StartNew(
() => ParseFile(file)
, CancellationToken.None
, TaskCreationOptions.DenyChildAttach
, LimitedConcurrencyScheduler
);
ProcessedFiles.Add(file.Name);
}
}
请注意,我并没有自己进行任何类型的负载平衡,而是依靠 LimitedConcurrencyLevelTaskScheduler
来执行所宣传的 - 也就是说,在 Task.Factory.StartNew
上立即接受所有工作项目,将它们排队在内部处理它们 在将来的某个时候 最多 [N = 最大并行度] 个线程池线程。
P.S。我假设 OnTimedEvent
将始终在同一个线程上触发。如果不是,则需要进行小的更改以确保线程安全:
private void OnTimedEvent(object source, ElapsedEventArgs e)
{
lock (ProcessedFiles)
{
// As above.
}
}
备选方案
现在,这里有一个稍微更新颖的方法:我们如何摆脱计时器和 LimitedConcurrencyLevelTaskScheduler
并将 所有 处理封装在一个单一的模块化管道中?会有 lot 的阻塞代码(除非你打破 TPL 数据流 - 但我会在这里坚持基本 Class 库类型),但阶段之间的消息传递是 如此 很容易就可以做出真正吸引人的设计(当然是我认为的)。
private async Task PipelineAsync()
{
const int MAX_FILES_TO_BE_QUEUED = 16;
using (BlockingCollection<FileInfo> queue = new BlockingCollection<FileInfo>(boundedCapacity: MAX_FILES_TO_BE_QUEUED))
{
Task producer = Task.Run(async () =>
{
try
{
while (true)
{
DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
HashSet<string> namesOfFilesQeueuedForProcessing = new HashSet<string>();
FileInfo[] newFilesToParse = info
.GetFiles("*.dat")
.Where(f =>
(DateTime.Now - f.CreationTime) > TimeSpan.FromSeconds(60) &&
!ProcessedFiles.Contains(f.Name))
.OrderBy(p => p.CreationTime) // Processing order is not guaranteed.
.ToArray();
foreach (FileInfo file in newFilesToParse)
{
// This will block if we reach bounded capacity thereby throttling
// the producer (meaning we'll never overflow the handover collection).
queue.Add(file);
namesOfFilesQeueuedForProcessing.Add(file.Name);
}
await Task.Delay(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
}
finally
{
// Exception? Cancellation? We'll let the
// consumer know that it can wind down.
queue.CompleteAdding();
}
});
Task consumer = Task.Run(() =>
{
ParallelOptions options = new ParallelOptions {
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(queue.GetConsumingEnumerable(), options, file => ParseFile(file));
});
await Task.WhenAll(producer, consumer).ConfigureAwait(false);
}
}
Stephen Toub 的 "Patterns of Parallel Programming" 第 55 页描述了这种模式的一般形式。我强烈建议您看一看。
这里的权衡是您将因使用 BlockingCollection<T>
和 Parallel.ForEach
而进行的阻塞量。管道作为一个概念的好处很多:新阶段(Task
实例)易于添加,完成和取消易于连接,生产者和消费者异常都被观察到,所有可变状态都令人愉快本地。
这类问题(您 排队 多个工作单元,并希望它们在 并行 中处理)非常适合TPL Dataflow:
private async void OnTimedEvent(object source, ElapsedEventArgs e)
{
DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
FileInfo[] allSrcFiles = info.GetFiles("*.dat").OrderBy(p => p.CreationTime).ToArray();
var validSrcFiles = allSrcFiles.Where(p => (DateTime.Now - p.CreationTime) > TimeSpan.FromSeconds(60));
var newFilesToParse = validSrcFiles.Where(f => !ProcessedFiles.Contains(f.Name));
if (newFilesToParse.Any()) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
var blockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = coresCount,
};
var block = new ActionBlock<FileInfo>(ParseFile, blockOptions);
var filesToParseCount = 0;
foreach (var file in newFilesToParse)
{
block.Post(file);
ProcessedFiles.Add(file.Name);
++filesToParseCount;
}
Console.WriteLine("There are " + filesToParseCount + " files in queue. Processing...");
block.Complete();
await block.Completion;
Console.WriteLine("Finished processing Files in the Queue. Waiting for new files...");
}