异步枚举文件夹

Asynchronously Enumerate Folders

我正在尝试实现一个通用文件系统爬虫,例如,它能够枚举从给定根目录开始的所有子文件夹。我想使用 async/await/Task 范式来做到这一点。

下面是我目前的代码。它有效,但我怀疑它可以改进。特别是,带注释的 Task.WaitAll 导致在深目录树中不必要的等待,因为循环在每个树级别暂停等待,而不是立即继续处理添加到 folderQueue 的新文件夹。

我想以某种方式将添加到 folderQueue 的新文件夹包含在 Task.WaitAll() WaitAll 正在进行中。这可能吗?

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

class FileSystemCrawlerSO
{
    static void Main(string[] args)
    {
        FileSystemCrawlerSO crawler = new FileSystemCrawlerSO();
        Stopwatch watch = new Stopwatch();
        watch.Start();
        crawler.CollectFolders(@"d:\www");
        watch.Stop();
        Console.WriteLine($"Collected {crawler.NumFolders:N0} folders in {watch.ElapsedMilliseconds} milliseconds.");
        if (Debugger.IsAttached)
            Console.ReadKey();
    }


    public int NumFolders { get; set; }

    private readonly Queue<DirectoryInfo> folderQueue;


    public FileSystemCrawlerSO()
    {
        folderQueue = new Queue<DirectoryInfo>();
    }


    public void CollectFolders(string path)
    {
        DirectoryInfo directoryInfo = new DirectoryInfo(path);
        lock (folderQueue)
           folderQueue.Enqueue(directoryInfo);
        List<Task> tasks = new List<Task>();
        do
        {
            tasks.Clear();
            lock (folderQueue)
            {
                while (folderQueue.Any())
                {
                    var folder = folderQueue.Dequeue();
                    Task task = Task.Run(() => CrawlFolder(folder));
                    tasks.Add(task);
                }
            }
            if (tasks.Any())
            {
                Console.WriteLine($"Waiting for {tasks.Count} tasks...");
                Task.WaitAll(tasks.ToArray()); //<== NOTE: THIS IS NOT OPTIMAL
            }
        } while (tasks.Any());
    }


    private void CrawlFolder(DirectoryInfo dir)
    {
        try
        {
            DirectoryInfo[] directoryInfos = dir.GetDirectories();
            lock (folderQueue)
                foreach (DirectoryInfo childInfo in directoryInfos)
                    folderQueue.Enqueue(childInfo);
            // Do something with the current folder
            // e.g. Console.WriteLine($"{dir.FullName}");
            NumFolders++;
        }
        catch (Exception ex)
        {
            while (ex != null)
            {
                Console.WriteLine($"{ex.GetType()} {ex.Message}\n{ex.StackTrace}");
                ex = ex.InnerException;
            }
        }
    }
}

这是我的建议。我使用通用 Concurrent*<> 类,所以我不必自己处理锁(尽管这不会自动提高性能)。

然后我为每个文件夹启动一个任务并在 ConcurrentBag<Task> 中排队。开始第一个任务后,我总是等待包中的第一个任务,如果没有其他任务等待,我就完成了。

public class FileSystemCrawlerSO
{
    public int NumFolders { get; set; }
    private readonly ConcurrentQueue<DirectoryInfo> folderQueue = new ConcurrentQueue<DirectoryInfo>();
    private readonly ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();

    public void CollectFolders(string path)
    {

        DirectoryInfo directoryInfo = new DirectoryInfo(path);
        tasks.Add(Task.Run(() => CrawlFolder(directoryInfo)));

        Task taskToWaitFor;
        while (tasks.TryTake(out taskToWaitFor))
            taskToWaitFor.Wait();
    }


    private void CrawlFolder(DirectoryInfo dir)
    {
        try
        {
            DirectoryInfo[] directoryInfos = dir.GetDirectories();
            foreach (DirectoryInfo childInfo in directoryInfos)
            {
                // here may be dragons using enumeration variable as closure!!
                DirectoryInfo di = childInfo;
                tasks.Add(Task.Run(() => CrawlFolder(di)));
            }
            // Do something with the current folder
            // e.g. Console.WriteLine($"{dir.FullName}");
            NumFolders++;
        }
        catch(Exception ex)
        {
            while (ex != null)
            {
                Console.WriteLine($"{ex.GetType()} {ex.Message}\n{ex.StackTrace}");
                ex = ex.InnerException;
            }
        }
    }
}

我还没有测量这是否比您的解决方案更快。但我认为(正如 Yacoub Massad 所说), 瓶颈将是 IO 系统本身,而不是你组织任务的方式

理论上,async/await应该能帮到这里。实际上,并没有那么多。这是因为 Win32 不公开用于目录函数(或某些文件函数,例如打开文件)的异步 API。

此外,使用多线程 (Task.Run) 并行化磁盘访问往往会适得其反,尤其是对于传统 (non-SSD) 磁盘。并行文件系统访问(与串行文件系统访问相反)往往会导致磁盘抖动,降低 总体吞吐量。

所以,在一般情况下,我建议只使用阻塞目录枚举方法。例如:

class FileSystemCrawlerSO
{
  static void Main(string[] args)
  {
    var numFolders = 0;
    Stopwatch watch = new Stopwatch();
    watch.Start();
    foreach (var dir in Directory.EnumerateDirectories(@"d:\www", "*", SearchOption.AllDirectories))
    {
      // Do something with the current folder
      // e.g. Console.WriteLine($"{dir.FullName}");
      ++numFolders;
    }
    watch.Stop();
    Console.WriteLine($"Collected {numFolders:N0} folders in {watch.ElapsedMilliseconds} milliseconds.");
    if (Debugger.IsAttached)
        Console.ReadKey();
  }
}

使用简单方法的一个很好的副作用是文件夹计数器变量 (NumFolders) 上不再存在竞争条件。

对于控制台应用程序,这就是您需要做的全部。如果要将其放入 UI 应用程序并且您不想阻塞 UI 线程,那么 single Task.Run 应该够了。

分开抓取和处理

尝试使用生产者-消费者模式。
这是一种在一个线程中抓取目录并在另一个线程中处理的方法。

public class Program
{
    private readonly BlockingCollection<DirectoryInfo> collection = new BlockingCollection<DirectoryInfo>();

    public void Run()
    {
        Task.Factory.StartNew(() => CollectFolders(@"d:\www"));

        foreach (var dir in collection.GetConsumingEnumerable())
        {
            // Do something with the current folder
            // e.g. Console.WriteLine($"{dir.FullName}");
        }
    }

    public void CollectFolders(string path)
    {
        try
        {
            foreach (var dir in new DirectoryInfo(path).EnumerateDirectories("*", SearchOption.AllDirectories))
            {
                collection.Add(dir);
            }
        }
        finally
        {
            collection.CompleteAdding();
        }
    }
}

更快

如果处理速度比抓取慢,您可能需要使用 Parallel.ForEach.

Parallel.ForEach(collection.GetConsumingEnumerable(), dir =>
{
    // Do something with the current folder
    // e.g. Console.WriteLine($"{dir.FullName}");
});