在锁内使用线程池和线程完成更新
Using threadpool with thread completion updates inside a lock
按照有关线程应用程序开发的几个教程,我实现了一个小型实践实用程序,它使用 Task.Run 和 Threadpool.QueueUserWorkItem
递归索引文件夹并生成所有包含文件的校验和清单.
虽然应用程序运行良好,但我无法在专门使用 threadpool
的部分更新 UI。 UI 在使用 Task.Run.
时使用 await Application.Current.Dispatcher.BeginInvoke
正确更新
期望的行为是随着每个线程池的线程完成各自的任务,进度在 ProcessChecksums
内继续更新。
int queuedThreads = 0;
object locker = new object();
CancellationTokenSource cancellationTokenSource;
List<string> lstFilePaths = new List<string>();
Manifest manifestData;
event Action<double> OnProgressChange;
event Action<string> OnStatusChange;
async void GenerateManifest(Object sender, RoutedEventArgs e) {
status = "Indexing Files";
progress = 1;
cancellationTokenSource = new CancellationTokenSource();
await Task.Run(() => Async_GenerateManifest(cancellationTokenSource.Token), cancellationTokenSource.Token);
if (!cancellationTokenSource.Token.IsCancellationRequested) {
Finished();
}else{
Cancelled();
}
}
async Task Async_GenerateManifest(CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) { return; }
Async_IndexFiles(initialpath, cancellationToken);
//Works Perfectly
await Application.Current.Dispatcher.BeginInvoke(DispatcherPriority.Background, new Action(() => {
OnStatusChange("Generating Checksums");
OnProgressChange(10);
}));
ProcessChecksums(cancellationToken);
//Works Perfectly
await Application.Current.Dispatcher.BeginInvoke(DispatcherPriority.Background, new Action(() => {
OnStatusChange("Saving Manifest");
OnProgressChange(90);
}));
SaveManifestFile(cancellationToken);
}
void ProcessChecksums(CancellationToken cancellationToken) {
List<FileData> lstFileData = new List<FileData>();
for (int i = 0; i < lstFilePaths.Count; i++) {
if (cancellationToken.IsCancellationRequested) { return; }
string s = lstFilePaths[i];
lock (locker) queuedThreads++;
ThreadPool.QueueUserWorkItem( x => {
manifestData.AddFileData(new FileData(s, GenerateChecksum(s)));
});
}
lock (locker) {
while (queuedThreads > 0) {
if (cancellationToken.IsCancellationRequested) { return; }=
Monitor.Wait(locker);
//Can't use await Dispatcher.BeginInvoke as is inside a lock, doesn't update GUI while waiting.
OnProgressChange((((queuedThreads - lstFilePaths.Count) * -1) / lstFilePaths.Count) - 0.2);
}
}
}
string GenerateChecksum(string filePath) {
//Time-consuming checksum generation
//...
lock (locker) {
queuedThreads--;
Monitor.Pulse(locker);
}
return BitConverter.ToString(checksum);
}
使用后台线程的进度更新来更新 UI 的标准模式是使用 IProgress<T>
/Progress<T>
。与直接使用 Dispatcher
.
相比,更现代的方法有几个好处
// You'd want to set these while on the UI thread.
// E.g., in your constructor:
// _percentProgress = new Progress<double>(value => ...);
// _statusProgress = new Progress<string>(value => ...);
IProgress<double> _percentProgress;
IProgress<string> _statusProgress;
async void GenerateManifest(Object sender, RoutedEventArgs e) {
status = "Indexing Files";
progress = 1;
cancellationTokenSource = new CancellationTokenSource();
await Task.Run(() => GenerateManifest(cancellationTokenSource.Token));
if (!cancellationTokenSource.Token.IsCancellationRequested) {
Finished();
}else{
Cancelled();
}
}
void GenerateManifest(CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) { return; }
Async_IndexFiles(initialpath, cancellationToken);
_statusProgress?.Report("Generating Checksums");
_percentProgress?.Report(10);
ProcessChecksums(cancellationToken);
_statusProgress?.Report("Saving Manifest");
_percentProgress?.Report(90);
SaveManifestFile(cancellationToken);
}
void ProcessChecksums(CancellationToken cancellationToken) {
List<FileData> lstFileData = new List<FileData>();
for (int i = 0; i < lstFilePaths.Count; i++) {
if (cancellationToken.IsCancellationRequested) { return; }
string s = lstFilePaths[i];
lock (locker) queuedThreads++;
ThreadPool.QueueUserWorkItem( x => {
manifestData.AddFileData(new FileData(s, GenerateChecksum(s)));
});
}
lock (locker) {
while (queuedThreads > 0) {
if (cancellationToken.IsCancellationRequested) { return; }
Monitor.Wait(locker);
_percentProgress?.Report((((queuedThreads - lstFilePaths.Count) * -1) / lstFilePaths.Count) - 0.2);
}
}
}
代码还有一些其他问题。 QueueUserWorkItem
可能应该替换为 Task.Run
和 await Task.WhenAll
以确保异常不会破坏进程。 if (...) Finished(); else Cancelled();
可能最好通过将 return 类型更改为 Task
来表示。使用 ThrowIfCancellationRequested
而不是 IsCancellationRequested
。还有一种没有 Pulse
的 Monitor.Wait
只用于进度更新,这很奇怪;允许该代码报告自己的进度会更清晰。如果您分别查看其中的每一个,您应该会发现您的代码最终会更清晰。
按照有关线程应用程序开发的几个教程,我实现了一个小型实践实用程序,它使用 Task.Run 和 Threadpool.QueueUserWorkItem
递归索引文件夹并生成所有包含文件的校验和清单.
虽然应用程序运行良好,但我无法在专门使用 threadpool
的部分更新 UI。 UI 在使用 Task.Run.
Application.Current.Dispatcher.BeginInvoke
正确更新
期望的行为是随着每个线程池的线程完成各自的任务,进度在 ProcessChecksums
内继续更新。
int queuedThreads = 0;
object locker = new object();
CancellationTokenSource cancellationTokenSource;
List<string> lstFilePaths = new List<string>();
Manifest manifestData;
event Action<double> OnProgressChange;
event Action<string> OnStatusChange;
async void GenerateManifest(Object sender, RoutedEventArgs e) {
status = "Indexing Files";
progress = 1;
cancellationTokenSource = new CancellationTokenSource();
await Task.Run(() => Async_GenerateManifest(cancellationTokenSource.Token), cancellationTokenSource.Token);
if (!cancellationTokenSource.Token.IsCancellationRequested) {
Finished();
}else{
Cancelled();
}
}
async Task Async_GenerateManifest(CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) { return; }
Async_IndexFiles(initialpath, cancellationToken);
//Works Perfectly
await Application.Current.Dispatcher.BeginInvoke(DispatcherPriority.Background, new Action(() => {
OnStatusChange("Generating Checksums");
OnProgressChange(10);
}));
ProcessChecksums(cancellationToken);
//Works Perfectly
await Application.Current.Dispatcher.BeginInvoke(DispatcherPriority.Background, new Action(() => {
OnStatusChange("Saving Manifest");
OnProgressChange(90);
}));
SaveManifestFile(cancellationToken);
}
void ProcessChecksums(CancellationToken cancellationToken) {
List<FileData> lstFileData = new List<FileData>();
for (int i = 0; i < lstFilePaths.Count; i++) {
if (cancellationToken.IsCancellationRequested) { return; }
string s = lstFilePaths[i];
lock (locker) queuedThreads++;
ThreadPool.QueueUserWorkItem( x => {
manifestData.AddFileData(new FileData(s, GenerateChecksum(s)));
});
}
lock (locker) {
while (queuedThreads > 0) {
if (cancellationToken.IsCancellationRequested) { return; }=
Monitor.Wait(locker);
//Can't use await Dispatcher.BeginInvoke as is inside a lock, doesn't update GUI while waiting.
OnProgressChange((((queuedThreads - lstFilePaths.Count) * -1) / lstFilePaths.Count) - 0.2);
}
}
}
string GenerateChecksum(string filePath) {
//Time-consuming checksum generation
//...
lock (locker) {
queuedThreads--;
Monitor.Pulse(locker);
}
return BitConverter.ToString(checksum);
}
使用后台线程的进度更新来更新 UI 的标准模式是使用 IProgress<T>
/Progress<T>
。与直接使用 Dispatcher
.
// You'd want to set these while on the UI thread. // E.g., in your constructor: // _percentProgress = new Progress<double>(value => ...); // _statusProgress = new Progress<string>(value => ...); IProgress<double> _percentProgress; IProgress<string> _statusProgress; async void GenerateManifest(Object sender, RoutedEventArgs e) { status = "Indexing Files"; progress = 1; cancellationTokenSource = new CancellationTokenSource(); await Task.Run(() => GenerateManifest(cancellationTokenSource.Token)); if (!cancellationTokenSource.Token.IsCancellationRequested) { Finished(); }else{ Cancelled(); } } void GenerateManifest(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return; } Async_IndexFiles(initialpath, cancellationToken); _statusProgress?.Report("Generating Checksums"); _percentProgress?.Report(10); ProcessChecksums(cancellationToken); _statusProgress?.Report("Saving Manifest"); _percentProgress?.Report(90); SaveManifestFile(cancellationToken); } void ProcessChecksums(CancellationToken cancellationToken) { List<FileData> lstFileData = new List<FileData>(); for (int i = 0; i < lstFilePaths.Count; i++) { if (cancellationToken.IsCancellationRequested) { return; } string s = lstFilePaths[i]; lock (locker) queuedThreads++; ThreadPool.QueueUserWorkItem( x => { manifestData.AddFileData(new FileData(s, GenerateChecksum(s))); }); } lock (locker) { while (queuedThreads > 0) { if (cancellationToken.IsCancellationRequested) { return; } Monitor.Wait(locker); _percentProgress?.Report((((queuedThreads - lstFilePaths.Count) * -1) / lstFilePaths.Count) - 0.2); } } }
代码还有一些其他问题。 QueueUserWorkItem
可能应该替换为 Task.Run
和 await Task.WhenAll
以确保异常不会破坏进程。 if (...) Finished(); else Cancelled();
可能最好通过将 return 类型更改为 Task
来表示。使用 ThrowIfCancellationRequested
而不是 IsCancellationRequested
。还有一种没有 Pulse
的 Monitor.Wait
只用于进度更新,这很奇怪;允许该代码报告自己的进度会更清晰。如果您分别查看其中的每一个,您应该会发现您的代码最终会更清晰。