使用 TPL and/or BackgroundWorker 处理间歇性 IO 作业
Processing intermittent IO jobs with TPL and/or BackgroundWorker
我有一个与一些串行和 USB 设备交互的 C# 应用程序。我正在尝试找到一种以并行方式命令这些设备的好方法,以便可以同时控制许多设备。我将通过用户输入和脚本命令向设备启动命令。
我目前正在尝试找出一种与 UI 线程并行执行 "collect" 和 运行 命令的简洁方法。我有多个表单,每个设备一个,每个都有几个可以发出命令的控件。
我目前看到的方式是用户单击一个按钮,该按钮将触发表单中的一个事件。另一个 class 让我们称之为 CommandManager
将挂钩所有这些事件;每个事件传递必要的信息以形成发送到设备的命令。
当事件由 CommandManager
处理时,它形成命令并将其添加到名为 DeviceCommandWorker
的子classed BackgroundWorker
中的 BlockingCollection<Command>
] 在应用程序打开时启动。它所做的只是遍历包含 Task.Factory.StartNew()
调用的代码块。
在 StartNew
块中 Take
在 BlockingCollection
上调用,等待输入命令。一旦命令在集合中,Take
returns 和 Task
就会开始运行。 BackgroundWorker
循环并重复该过程,直到它被取消。
// Event handler running on the UI Thread
public void ProcessCommand(DeviceCommand command)
{
// I assume I cannot add to Commands (BlockingCollection) from here?
DeviceCommandWorker.Instance.Commands.Add(command);
}
// ....
// BackgroundWorker started upon Application startup
// ...
public class DeviceCommandWorker : BackgroundWorker
{
public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());
public BlockingCollection<DeviceCommand> Commands { get; set; }
private DeviceCommandWorker()
{
WorkerSupportsCancellation = true;
Commands = new BlockingCollection<DeviceCommand>();
}
protected override void OnDoWork(DoWorkEventArgs e)
{
while (!CancellationPending)
{
var commandTask = Task.Factory.StartNew(() =>
{
// If the BackGroundWorker is cancelled, how to esacpe this blocking call?
DeviceCommand command = commandQueue.Take();
DeviceCommandResult result;
command.Process(out result);
if(result == DeviceCommandResult.Error)
; // How would I inform the UI of this result?
});
}
e.Cancel = true;
}
}
我的问题已在上面的代码中陈述,但我会重申它们。
我的第一个问题是我认为我不能从 运行ning BackGroundWorker
之外添加到 BlockingCollection
? (通过阅读 TPL,添加时不应该有一个我可以锁定的同步对象吗?)
假设我可以添加到集合中,当 Take
方法阻塞时是否没有办法逃脱,特别是如果 BackgroundWorker
被取消,它不会永远被阻塞吗? (可能在取消之前我可以发送一个 "custom command" 来创建一个什么也不做的任务,然后我就可以退出 while 循环)
最后,我如何将命令执行成功或错误报告回 UI 线程?我看到了这个SO answer,这个方向对吗?
因此,经过更多的修改后,我似乎得到了上面的想法,最初对我来说效果很好。我不确定它在做一些实际工作时会如何表现,但到目前为止我很高兴。
所以我重新编写了代码。我发现如果 Task.Factory.StartNew()
之外没有 Take
,我实际上只是让任务尽可能快地等待从 BlockingCollection
消耗。所以我把语句移到了循环之外。我还确认要摆脱阻塞 Take
我需要发送某种特殊命令以便我可以停止后台工作程序。最后(未显示)我计划使用 Control.Invoke
来 return 任何 UI 线程的失败。
public Boolean StartCommandWorker()
{
if (DeviceCommandWorker.Instance.IsBusy)
return false;
else
{
Console.Out.WriteLine("Called start command worker!");
DeviceCommandWorker.Instance.RunWorkerAsync();
return DeviceCommandWorker.Instance.IsBusy;
}
}
public void StopCommandWorker()
{
Console.Out.WriteLine("Called stop command worker!");
ProcessCommand("QUIT");
DeviceCommandWorker.Instance.CancelAsync();
}
public Boolean ProcessCommand(String command)
{
DeviceCommandWorker.Instance.Commands.Add(command);
Console.Out.WriteLine("Enqueued command \"" + command + "\" ThreadID: " + Thread.CurrentThread.ManagedThreadId);
return true;
}
internal class DeviceCommandWorker : BackgroundWorker
{
public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());
public BlockingCollection<String> Commands { get; set; }
private DeviceCommandWorker()
{
WorkerSupportsCancellation = true;
Commands = new BlockingCollection<String>();
}
protected override void OnDoWork(DoWorkEventArgs e)
{
Console.Out.WriteLine("Background Worker Started ThreadID: " + Thread.CurrentThread.ManagedThreadId);
while (!CancellationPending)
{
Console.Out.WriteLine("Waiting for command on ThreadID: " + Thread.CurrentThread.ManagedThreadId);
String aString = Commands.Take();
var commandTask = Task.Factory.StartNew(() =>
{
Console.Out.WriteLine(" Dequeued command \"" + aString + "\" ThreadID: " + Thread.CurrentThread.ManagedThreadId);
if (aString.Equals("QUIT"))
Console.Out.WriteLine(" Quit worker called: " + aString);
});
}
Console.Out.WriteLine("Background Worker: Stopped!");
e.Cancel = true;
}
}
这是一些示例输出。我制作了一个小的 UI 表单,我可以用它来启动、停止和发送命令。它只是发送一个随机的 double 作为命令。
Called start command worker!
Background Worker Started ThreadID: 17
Waiting for command on ThreadID: 17
Enqueued command "Hello" ThreadID: 10
Waiting for command on ThreadID: 17
Dequeued command "Hello" ThreadID: 16
Enqueued command "0.0745" ThreadID: 10
Waiting for command on ThreadID: 17
Dequeued command "0.0745" ThreadID: 12
Enqueued command "0.7043" ThreadID: 10
Dequeued command "0.7043" ThreadID: 16
Waiting for command on ThreadID: 17
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
Dequeued command "QUIT" ThreadID: 12
Quit worker called: QUIT
Background Worker: Stopped!
Enqueued command "0.2646" ThreadID: 10
Enqueued command "0.1619" ThreadID: 10
Enqueued command "0.5570" ThreadID: 10
Enqueued command "0.4129" ThreadID: 10
Called start command worker!
Background Worker Started ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Dequeued command "0.2646" ThreadID: 16
Dequeued command "0.1619" ThreadID: 16
Dequeued command "0.5570" ThreadID: 16
Dequeued command "0.4129" ThreadID: 16
Enqueued command "0.8368" ThreadID: 10
Dequeued command "0.8368" ThreadID: 17
Waiting for command on ThreadID: 12
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
Dequeued command "QUIT" ThreadID: 16
Quit worker called: QUIT
Background Worker: Stopped!
我有一个与一些串行和 USB 设备交互的 C# 应用程序。我正在尝试找到一种以并行方式命令这些设备的好方法,以便可以同时控制许多设备。我将通过用户输入和脚本命令向设备启动命令。
我目前正在尝试找出一种与 UI 线程并行执行 "collect" 和 运行 命令的简洁方法。我有多个表单,每个设备一个,每个都有几个可以发出命令的控件。
我目前看到的方式是用户单击一个按钮,该按钮将触发表单中的一个事件。另一个 class 让我们称之为 CommandManager
将挂钩所有这些事件;每个事件传递必要的信息以形成发送到设备的命令。
当事件由 CommandManager
处理时,它形成命令并将其添加到名为 DeviceCommandWorker
的子classed BackgroundWorker
中的 BlockingCollection<Command>
] 在应用程序打开时启动。它所做的只是遍历包含 Task.Factory.StartNew()
调用的代码块。
在 StartNew
块中 Take
在 BlockingCollection
上调用,等待输入命令。一旦命令在集合中,Take
returns 和 Task
就会开始运行。 BackgroundWorker
循环并重复该过程,直到它被取消。
// Event handler running on the UI Thread
public void ProcessCommand(DeviceCommand command)
{
// I assume I cannot add to Commands (BlockingCollection) from here?
DeviceCommandWorker.Instance.Commands.Add(command);
}
// ....
// BackgroundWorker started upon Application startup
// ...
public class DeviceCommandWorker : BackgroundWorker
{
public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());
public BlockingCollection<DeviceCommand> Commands { get; set; }
private DeviceCommandWorker()
{
WorkerSupportsCancellation = true;
Commands = new BlockingCollection<DeviceCommand>();
}
protected override void OnDoWork(DoWorkEventArgs e)
{
while (!CancellationPending)
{
var commandTask = Task.Factory.StartNew(() =>
{
// If the BackGroundWorker is cancelled, how to esacpe this blocking call?
DeviceCommand command = commandQueue.Take();
DeviceCommandResult result;
command.Process(out result);
if(result == DeviceCommandResult.Error)
; // How would I inform the UI of this result?
});
}
e.Cancel = true;
}
}
我的问题已在上面的代码中陈述,但我会重申它们。
我的第一个问题是我认为我不能从 运行ning BackGroundWorker
之外添加到 BlockingCollection
? (通过阅读 TPL,添加时不应该有一个我可以锁定的同步对象吗?)
假设我可以添加到集合中,当 Take
方法阻塞时是否没有办法逃脱,特别是如果 BackgroundWorker
被取消,它不会永远被阻塞吗? (可能在取消之前我可以发送一个 "custom command" 来创建一个什么也不做的任务,然后我就可以退出 while 循环)
最后,我如何将命令执行成功或错误报告回 UI 线程?我看到了这个SO answer,这个方向对吗?
因此,经过更多的修改后,我似乎得到了上面的想法,最初对我来说效果很好。我不确定它在做一些实际工作时会如何表现,但到目前为止我很高兴。
所以我重新编写了代码。我发现如果 Task.Factory.StartNew()
之外没有 Take
,我实际上只是让任务尽可能快地等待从 BlockingCollection
消耗。所以我把语句移到了循环之外。我还确认要摆脱阻塞 Take
我需要发送某种特殊命令以便我可以停止后台工作程序。最后(未显示)我计划使用 Control.Invoke
来 return 任何 UI 线程的失败。
public Boolean StartCommandWorker()
{
if (DeviceCommandWorker.Instance.IsBusy)
return false;
else
{
Console.Out.WriteLine("Called start command worker!");
DeviceCommandWorker.Instance.RunWorkerAsync();
return DeviceCommandWorker.Instance.IsBusy;
}
}
public void StopCommandWorker()
{
Console.Out.WriteLine("Called stop command worker!");
ProcessCommand("QUIT");
DeviceCommandWorker.Instance.CancelAsync();
}
public Boolean ProcessCommand(String command)
{
DeviceCommandWorker.Instance.Commands.Add(command);
Console.Out.WriteLine("Enqueued command \"" + command + "\" ThreadID: " + Thread.CurrentThread.ManagedThreadId);
return true;
}
internal class DeviceCommandWorker : BackgroundWorker
{
public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());
public BlockingCollection<String> Commands { get; set; }
private DeviceCommandWorker()
{
WorkerSupportsCancellation = true;
Commands = new BlockingCollection<String>();
}
protected override void OnDoWork(DoWorkEventArgs e)
{
Console.Out.WriteLine("Background Worker Started ThreadID: " + Thread.CurrentThread.ManagedThreadId);
while (!CancellationPending)
{
Console.Out.WriteLine("Waiting for command on ThreadID: " + Thread.CurrentThread.ManagedThreadId);
String aString = Commands.Take();
var commandTask = Task.Factory.StartNew(() =>
{
Console.Out.WriteLine(" Dequeued command \"" + aString + "\" ThreadID: " + Thread.CurrentThread.ManagedThreadId);
if (aString.Equals("QUIT"))
Console.Out.WriteLine(" Quit worker called: " + aString);
});
}
Console.Out.WriteLine("Background Worker: Stopped!");
e.Cancel = true;
}
}
这是一些示例输出。我制作了一个小的 UI 表单,我可以用它来启动、停止和发送命令。它只是发送一个随机的 double 作为命令。
Called start command worker!
Background Worker Started ThreadID: 17
Waiting for command on ThreadID: 17
Enqueued command "Hello" ThreadID: 10
Waiting for command on ThreadID: 17
Dequeued command "Hello" ThreadID: 16
Enqueued command "0.0745" ThreadID: 10
Waiting for command on ThreadID: 17
Dequeued command "0.0745" ThreadID: 12
Enqueued command "0.7043" ThreadID: 10
Dequeued command "0.7043" ThreadID: 16
Waiting for command on ThreadID: 17
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
Dequeued command "QUIT" ThreadID: 12
Quit worker called: QUIT
Background Worker: Stopped!
Enqueued command "0.2646" ThreadID: 10
Enqueued command "0.1619" ThreadID: 10
Enqueued command "0.5570" ThreadID: 10
Enqueued command "0.4129" ThreadID: 10
Called start command worker!
Background Worker Started ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Dequeued command "0.2646" ThreadID: 16
Dequeued command "0.1619" ThreadID: 16
Dequeued command "0.5570" ThreadID: 16
Dequeued command "0.4129" ThreadID: 16
Enqueued command "0.8368" ThreadID: 10
Dequeued command "0.8368" ThreadID: 17
Waiting for command on ThreadID: 12
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
Dequeued command "QUIT" ThreadID: 16
Quit worker called: QUIT
Background Worker: Stopped!