在锁内使用线程池和线程完成更新

Using threadpool with thread completion updates inside a lock

按照有关线程应用程序开发的几个教程,我实现了一个小型实践实用程序,它使用 Task.Run 和 Threadpool.QueueUserWorkItem 递归索引文件夹并生成所有包含文件的校验和清单.

虽然应用程序运行良好,但我无法在专门使用 threadpool 的部分更新 UI。 UI 在使用 Task.Run.

时使用 await Application.Current.Dispatcher.BeginInvoke 正确更新

期望的行为是随着每个线程池的线程完成各自的任务,进度在 ProcessChecksums 内继续更新。

int queuedThreads = 0;
object locker = new object();
CancellationTokenSource cancellationTokenSource;
List<string> lstFilePaths = new List<string>();
Manifest manifestData;
event Action<double> OnProgressChange;
event Action<string> OnStatusChange;
    
async void GenerateManifest(Object sender, RoutedEventArgs e) {
    status = "Indexing Files";
    progress = 1;

    cancellationTokenSource = new CancellationTokenSource();

    await Task.Run(() => Async_GenerateManifest(cancellationTokenSource.Token), cancellationTokenSource.Token);

    if (!cancellationTokenSource.Token.IsCancellationRequested) {
        Finished();
    }else{
        Cancelled();
    }
}
    
async Task Async_GenerateManifest(CancellationToken cancellationToken) {
    if (cancellationToken.IsCancellationRequested) { return; }

    Async_IndexFiles(initialpath, cancellationToken);
        
    //Works Perfectly
    await Application.Current.Dispatcher.BeginInvoke(DispatcherPriority.Background, new Action(() => {
        OnStatusChange("Generating Checksums");
        OnProgressChange(10);
    }));

    ProcessChecksums(cancellationToken);

    //Works Perfectly
    await Application.Current.Dispatcher.BeginInvoke(DispatcherPriority.Background, new Action(() => {
        OnStatusChange("Saving Manifest");
        OnProgressChange(90);
    }));

    SaveManifestFile(cancellationToken);
}
    
void ProcessChecksums(CancellationToken cancellationToken) {
    List<FileData> lstFileData = new List<FileData>();

    for (int i = 0; i < lstFilePaths.Count; i++) {
        if (cancellationToken.IsCancellationRequested) { return; }
            
        string s = lstFilePaths[i];
        lock (locker) queuedThreads++;

        ThreadPool.QueueUserWorkItem( x => {
            manifestData.AddFileData(new FileData(s, GenerateChecksum(s)));
        });
    }

    lock (locker) {
        while (queuedThreads > 0) {
            if (cancellationToken.IsCancellationRequested) { return; }=
            Monitor.Wait(locker);
                
            //Can't use await Dispatcher.BeginInvoke as is inside a lock, doesn't update GUI while waiting.
            OnProgressChange((((queuedThreads - lstFilePaths.Count) * -1) / lstFilePaths.Count) - 0.2);
        }
    }
}

string GenerateChecksum(string filePath) {
    //Time-consuming checksum generation
    //...

    lock (locker) {
        queuedThreads--;
        Monitor.Pulse(locker);
    }

    return BitConverter.ToString(checksum);
}

使用后台线程的进度更新来更新 UI 的标准模式是使用 IProgress<T>/Progress<T>。与直接使用 Dispatcher.

相比,更现代的方法有几个好处
// You'd want to set these while on the UI thread.
// E.g., in your constructor:
//   _percentProgress = new Progress<double>(value => ...);
//   _statusProgress = new Progress<string>(value => ...);
IProgress<double> _percentProgress;
IProgress<string> _statusProgress;

async void GenerateManifest(Object sender, RoutedEventArgs e) {
  status = "Indexing Files";
  progress = 1;

  cancellationTokenSource = new CancellationTokenSource();

  await Task.Run(() => GenerateManifest(cancellationTokenSource.Token));

  if (!cancellationTokenSource.Token.IsCancellationRequested) {
    Finished();
  }else{
    Cancelled();
  }
}

void GenerateManifest(CancellationToken cancellationToken) {
  if (cancellationToken.IsCancellationRequested) { return; }

  Async_IndexFiles(initialpath, cancellationToken);

  _statusProgress?.Report("Generating Checksums");
  _percentProgress?.Report(10);

  ProcessChecksums(cancellationToken);

  _statusProgress?.Report("Saving Manifest");
  _percentProgress?.Report(90);

  SaveManifestFile(cancellationToken);
}
    
void ProcessChecksums(CancellationToken cancellationToken) {
  List<FileData> lstFileData = new List<FileData>();

  for (int i = 0; i < lstFilePaths.Count; i++) {
    if (cancellationToken.IsCancellationRequested) { return; }
            
    string s = lstFilePaths[i];
    lock (locker) queuedThreads++;

    ThreadPool.QueueUserWorkItem( x => {
        manifestData.AddFileData(new FileData(s, GenerateChecksum(s)));
    });
  }

  lock (locker) {
    while (queuedThreads > 0) {
      if (cancellationToken.IsCancellationRequested) { return; }
      Monitor.Wait(locker);

      _percentProgress?.Report((((queuedThreads - lstFilePaths.Count) * -1) / lstFilePaths.Count) - 0.2);
    }
  }
}

代码还有一些其他问题。 QueueUserWorkItem 可能应该替换为 Task.Runawait Task.WhenAll 以确保异常不会破坏进程。 if (...) Finished(); else Cancelled(); 可能最好通过将 return 类型更改为 Task 来表示。使用 ThrowIfCancellationRequested 而不是 IsCancellationRequested。还有一种没有 PulseMonitor.Wait 只用于进度更新,这很奇怪;允许该代码报告自己的进度会更清晰。如果您分别查看其中的每一个,您应该会发现您的代码最终会更清晰。