异步枚举文件夹
Asynchronously Enumerate Folders
我正在尝试实现一个通用文件系统爬虫,例如,它能够枚举从给定根目录开始的所有子文件夹。我想使用 async/await/Task 范式来做到这一点。
下面是我目前的代码。它有效,但我怀疑它可以改进。特别是,带注释的 Task.WaitAll
导致在深目录树中不必要的等待,因为循环在每个树级别暂停等待,而不是立即继续处理添加到 folderQueue
的新文件夹。
我想以某种方式将添加到 folderQueue
的新文件夹包含在 Task.WaitAll()
而 WaitAll
正在进行中。这可能吗?
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
class FileSystemCrawlerSO
{
static void Main(string[] args)
{
FileSystemCrawlerSO crawler = new FileSystemCrawlerSO();
Stopwatch watch = new Stopwatch();
watch.Start();
crawler.CollectFolders(@"d:\www");
watch.Stop();
Console.WriteLine($"Collected {crawler.NumFolders:N0} folders in {watch.ElapsedMilliseconds} milliseconds.");
if (Debugger.IsAttached)
Console.ReadKey();
}
public int NumFolders { get; set; }
private readonly Queue<DirectoryInfo> folderQueue;
public FileSystemCrawlerSO()
{
folderQueue = new Queue<DirectoryInfo>();
}
public void CollectFolders(string path)
{
DirectoryInfo directoryInfo = new DirectoryInfo(path);
lock (folderQueue)
folderQueue.Enqueue(directoryInfo);
List<Task> tasks = new List<Task>();
do
{
tasks.Clear();
lock (folderQueue)
{
while (folderQueue.Any())
{
var folder = folderQueue.Dequeue();
Task task = Task.Run(() => CrawlFolder(folder));
tasks.Add(task);
}
}
if (tasks.Any())
{
Console.WriteLine($"Waiting for {tasks.Count} tasks...");
Task.WaitAll(tasks.ToArray()); //<== NOTE: THIS IS NOT OPTIMAL
}
} while (tasks.Any());
}
private void CrawlFolder(DirectoryInfo dir)
{
try
{
DirectoryInfo[] directoryInfos = dir.GetDirectories();
lock (folderQueue)
foreach (DirectoryInfo childInfo in directoryInfos)
folderQueue.Enqueue(childInfo);
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
NumFolders++;
}
catch (Exception ex)
{
while (ex != null)
{
Console.WriteLine($"{ex.GetType()} {ex.Message}\n{ex.StackTrace}");
ex = ex.InnerException;
}
}
}
}
这是我的建议。我使用通用 Concurrent*<>
类,所以我不必自己处理锁(尽管这不会自动提高性能)。
然后我为每个文件夹启动一个任务并在 ConcurrentBag<Task>
中排队。开始第一个任务后,我总是等待包中的第一个任务,如果没有其他任务等待,我就完成了。
public class FileSystemCrawlerSO
{
public int NumFolders { get; set; }
private readonly ConcurrentQueue<DirectoryInfo> folderQueue = new ConcurrentQueue<DirectoryInfo>();
private readonly ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();
public void CollectFolders(string path)
{
DirectoryInfo directoryInfo = new DirectoryInfo(path);
tasks.Add(Task.Run(() => CrawlFolder(directoryInfo)));
Task taskToWaitFor;
while (tasks.TryTake(out taskToWaitFor))
taskToWaitFor.Wait();
}
private void CrawlFolder(DirectoryInfo dir)
{
try
{
DirectoryInfo[] directoryInfos = dir.GetDirectories();
foreach (DirectoryInfo childInfo in directoryInfos)
{
// here may be dragons using enumeration variable as closure!!
DirectoryInfo di = childInfo;
tasks.Add(Task.Run(() => CrawlFolder(di)));
}
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
NumFolders++;
}
catch(Exception ex)
{
while (ex != null)
{
Console.WriteLine($"{ex.GetType()} {ex.Message}\n{ex.StackTrace}");
ex = ex.InnerException;
}
}
}
}
我还没有测量这是否比您的解决方案更快。但我认为(正如 Yacoub Massad 所说), 瓶颈将是 IO 系统本身,而不是你组织任务的方式。
理论上,async/await应该能帮到这里。实际上,并没有那么多。这是因为 Win32 不公开用于目录函数(或某些文件函数,例如打开文件)的异步 API。
此外,使用多线程 (Task.Run
) 并行化磁盘访问往往会适得其反,尤其是对于传统 (non-SSD) 磁盘。并行文件系统访问(与串行文件系统访问相反)往往会导致磁盘抖动,降低 总体吞吐量。
所以,在一般情况下,我建议只使用阻塞目录枚举方法。例如:
class FileSystemCrawlerSO
{
static void Main(string[] args)
{
var numFolders = 0;
Stopwatch watch = new Stopwatch();
watch.Start();
foreach (var dir in Directory.EnumerateDirectories(@"d:\www", "*", SearchOption.AllDirectories))
{
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
++numFolders;
}
watch.Stop();
Console.WriteLine($"Collected {numFolders:N0} folders in {watch.ElapsedMilliseconds} milliseconds.");
if (Debugger.IsAttached)
Console.ReadKey();
}
}
使用简单方法的一个很好的副作用是文件夹计数器变量 (NumFolders
) 上不再存在竞争条件。
对于控制台应用程序,这就是您需要做的全部。如果要将其放入 UI 应用程序并且您不想阻塞 UI 线程,那么 single Task.Run
应该够了。
分开抓取和处理
尝试使用生产者-消费者模式。
这是一种在一个线程中抓取目录并在另一个线程中处理的方法。
public class Program
{
private readonly BlockingCollection<DirectoryInfo> collection = new BlockingCollection<DirectoryInfo>();
public void Run()
{
Task.Factory.StartNew(() => CollectFolders(@"d:\www"));
foreach (var dir in collection.GetConsumingEnumerable())
{
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
}
}
public void CollectFolders(string path)
{
try
{
foreach (var dir in new DirectoryInfo(path).EnumerateDirectories("*", SearchOption.AllDirectories))
{
collection.Add(dir);
}
}
finally
{
collection.CompleteAdding();
}
}
}
更快
如果处理速度比抓取慢,您可能需要使用 Parallel.ForEach.
Parallel.ForEach(collection.GetConsumingEnumerable(), dir =>
{
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
});
我正在尝试实现一个通用文件系统爬虫,例如,它能够枚举从给定根目录开始的所有子文件夹。我想使用 async/await/Task 范式来做到这一点。
下面是我目前的代码。它有效,但我怀疑它可以改进。特别是,带注释的 Task.WaitAll
导致在深目录树中不必要的等待,因为循环在每个树级别暂停等待,而不是立即继续处理添加到 folderQueue
的新文件夹。
我想以某种方式将添加到 folderQueue
的新文件夹包含在 Task.WaitAll()
而 WaitAll
正在进行中。这可能吗?
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
class FileSystemCrawlerSO
{
static void Main(string[] args)
{
FileSystemCrawlerSO crawler = new FileSystemCrawlerSO();
Stopwatch watch = new Stopwatch();
watch.Start();
crawler.CollectFolders(@"d:\www");
watch.Stop();
Console.WriteLine($"Collected {crawler.NumFolders:N0} folders in {watch.ElapsedMilliseconds} milliseconds.");
if (Debugger.IsAttached)
Console.ReadKey();
}
public int NumFolders { get; set; }
private readonly Queue<DirectoryInfo> folderQueue;
public FileSystemCrawlerSO()
{
folderQueue = new Queue<DirectoryInfo>();
}
public void CollectFolders(string path)
{
DirectoryInfo directoryInfo = new DirectoryInfo(path);
lock (folderQueue)
folderQueue.Enqueue(directoryInfo);
List<Task> tasks = new List<Task>();
do
{
tasks.Clear();
lock (folderQueue)
{
while (folderQueue.Any())
{
var folder = folderQueue.Dequeue();
Task task = Task.Run(() => CrawlFolder(folder));
tasks.Add(task);
}
}
if (tasks.Any())
{
Console.WriteLine($"Waiting for {tasks.Count} tasks...");
Task.WaitAll(tasks.ToArray()); //<== NOTE: THIS IS NOT OPTIMAL
}
} while (tasks.Any());
}
private void CrawlFolder(DirectoryInfo dir)
{
try
{
DirectoryInfo[] directoryInfos = dir.GetDirectories();
lock (folderQueue)
foreach (DirectoryInfo childInfo in directoryInfos)
folderQueue.Enqueue(childInfo);
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
NumFolders++;
}
catch (Exception ex)
{
while (ex != null)
{
Console.WriteLine($"{ex.GetType()} {ex.Message}\n{ex.StackTrace}");
ex = ex.InnerException;
}
}
}
}
这是我的建议。我使用通用 Concurrent*<>
类,所以我不必自己处理锁(尽管这不会自动提高性能)。
然后我为每个文件夹启动一个任务并在 ConcurrentBag<Task>
中排队。开始第一个任务后,我总是等待包中的第一个任务,如果没有其他任务等待,我就完成了。
public class FileSystemCrawlerSO
{
public int NumFolders { get; set; }
private readonly ConcurrentQueue<DirectoryInfo> folderQueue = new ConcurrentQueue<DirectoryInfo>();
private readonly ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();
public void CollectFolders(string path)
{
DirectoryInfo directoryInfo = new DirectoryInfo(path);
tasks.Add(Task.Run(() => CrawlFolder(directoryInfo)));
Task taskToWaitFor;
while (tasks.TryTake(out taskToWaitFor))
taskToWaitFor.Wait();
}
private void CrawlFolder(DirectoryInfo dir)
{
try
{
DirectoryInfo[] directoryInfos = dir.GetDirectories();
foreach (DirectoryInfo childInfo in directoryInfos)
{
// here may be dragons using enumeration variable as closure!!
DirectoryInfo di = childInfo;
tasks.Add(Task.Run(() => CrawlFolder(di)));
}
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
NumFolders++;
}
catch(Exception ex)
{
while (ex != null)
{
Console.WriteLine($"{ex.GetType()} {ex.Message}\n{ex.StackTrace}");
ex = ex.InnerException;
}
}
}
}
我还没有测量这是否比您的解决方案更快。但我认为(正如 Yacoub Massad 所说), 瓶颈将是 IO 系统本身,而不是你组织任务的方式。
理论上,async/await应该能帮到这里。实际上,并没有那么多。这是因为 Win32 不公开用于目录函数(或某些文件函数,例如打开文件)的异步 API。
此外,使用多线程 (Task.Run
) 并行化磁盘访问往往会适得其反,尤其是对于传统 (non-SSD) 磁盘。并行文件系统访问(与串行文件系统访问相反)往往会导致磁盘抖动,降低 总体吞吐量。
所以,在一般情况下,我建议只使用阻塞目录枚举方法。例如:
class FileSystemCrawlerSO
{
static void Main(string[] args)
{
var numFolders = 0;
Stopwatch watch = new Stopwatch();
watch.Start();
foreach (var dir in Directory.EnumerateDirectories(@"d:\www", "*", SearchOption.AllDirectories))
{
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
++numFolders;
}
watch.Stop();
Console.WriteLine($"Collected {numFolders:N0} folders in {watch.ElapsedMilliseconds} milliseconds.");
if (Debugger.IsAttached)
Console.ReadKey();
}
}
使用简单方法的一个很好的副作用是文件夹计数器变量 (NumFolders
) 上不再存在竞争条件。
对于控制台应用程序,这就是您需要做的全部。如果要将其放入 UI 应用程序并且您不想阻塞 UI 线程,那么 single Task.Run
应该够了。
分开抓取和处理
尝试使用生产者-消费者模式。
这是一种在一个线程中抓取目录并在另一个线程中处理的方法。
public class Program
{
private readonly BlockingCollection<DirectoryInfo> collection = new BlockingCollection<DirectoryInfo>();
public void Run()
{
Task.Factory.StartNew(() => CollectFolders(@"d:\www"));
foreach (var dir in collection.GetConsumingEnumerable())
{
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
}
}
public void CollectFolders(string path)
{
try
{
foreach (var dir in new DirectoryInfo(path).EnumerateDirectories("*", SearchOption.AllDirectories))
{
collection.Add(dir);
}
}
finally
{
collection.CompleteAdding();
}
}
}
更快
如果处理速度比抓取慢,您可能需要使用 Parallel.ForEach.
Parallel.ForEach(collection.GetConsumingEnumerable(), dir =>
{
// Do something with the current folder
// e.g. Console.WriteLine($"{dir.FullName}");
});