监控队列的线程<Actions>
Threads monitoring a Queue<Actions>
我正在做一个使用 SNMP 映射网络(仅限路由器)的小项目。为了加快速度,除了由主线程完成的第一项工作外,我正在尝试让 池 线程负责完成我需要的工作。
此时我有两个作业,一个带参数,另一个不带参数:
UpdateDeviceInfo(NetworkDevice nd)
UpdateLinks()
*尚未定义
我想要实现的是让那些等待工作的工作线程
出现在 Queue<Action>
上并在它为空时等待。主线程将添加第一个作业,然后等待所有可能添加更多作业的工作人员完成,然后再开始添加第二个作业并唤醒休眠线程。
我的problem/questions是:
如何定义 Queue<Actions>
以便我可以插入方法和参数(如果有)。如果不可能,我可以让所有函数接受相同的参数。
如何无限期地启动 working 线程。我不确定应该在哪里创建 for(;;)
.
到目前为止,这是我的代码:
public enum DatabaseState
{
Empty = 0,
Learning = 1,
Updating = 2,
Stable = 3,
Exiting = 4
};
public class NetworkDB
{
public Dictionary<string, NetworkDevice> database;
private Queue<Action<NetworkDevice>> jobs;
private string _community;
private string _ipaddress;
private Object _statelock = new Object();
private DatabaseState _state = DatabaseState.Empty;
private readonly int workers = 4;
private Object _threadswaitinglock = new Object();
private int _threadswaiting = 0;
public Dictionary<string, NetworkDevice> Database { get => database; set => database = value; }
public NetworkDB(string community, string ipaddress)
{
_community = community;
_ipaddress = ipaddress;
database = new Dictionary<string, NetworkDevice>();
jobs = new Queue<Action<NetworkDevice>>();
}
public void Start()
{
NetworkDevice nd = SNMP.GetDeviceInfo(new IpAddress(_ipaddress), _community);
if (nd.Status > NetworkDeviceStatus.Unknown)
{
database.Add(nd.Id, nd);
_state = DatabaseState.Learning;
nd.Update(this); // The first job is done by the main thread
for (int i = 0; i < workers; i++)
{
Thread t = new Thread(JobRemove);
t.Start();
}
lock (_statelock)
{
if (_state == DatabaseState.Learning)
{
Monitor.Wait(_statelock);
}
}
lock (_statelock)
{
if (_state == DatabaseState.Updating)
{
Monitor.Wait(_statelock);
}
}
foreach (KeyValuePair<string, NetworkDevice> n in database)
{
using (System.IO.StreamWriter file = new System.IO.StreamWriter(n.Value.Name + ".txt")
{
file.WriteLine(n);
}
}
}
}
public void JobInsert(Action<NetworkDevice> func, NetworkDevice nd)
{
lock (jobs)
{
jobs.Enqueue(item);
if (jobs.Count == 1)
{
// wake up any blocked dequeue
Monitor.Pulse(jobs);
}
}
}
public void JobRemove()
{
Action<NetworkDevice> item;
lock (jobs)
{
while (jobs.Count == 0)
{
lock (_threadswaitinglock)
{
_threadswaiting += 1;
if (_threadswaiting == workers)
Monitor.Pulse(_statelock);
}
Monitor.Wait(jobs);
}
lock (_threadswaitinglock)
{
_threadswaiting -= 1;
}
item = jobs.Dequeue();
item.Invoke();
}
}
public bool NetworkDeviceExists(NetworkDevice nd)
{
try
{
Monitor.Enter(database);
if (database.ContainsKey(nd.Id))
{
return true;
}
else
{
database.Add(nd.Id, nd);
Action<NetworkDevice> action = new Action<NetworkDevice>(UpdateDeviceInfo);
jobs.Enqueue(action);
return false;
}
}
finally
{
Monitor.Exit(database);
}
}
//Job1 - Learning -> Update device info
public void UpdateDeviceInfo(NetworkDevice nd)
{
nd.Update(this);
try
{
Monitor.Enter(database);
nd.Status = NetworkDeviceStatus.Self;
}
finally
{
Monitor.Exit(database);
}
}
//Job2 - Updating -> After Learning, create links between neighbours
private void UpdateLinks()
{
}
}
您最好的选择似乎是使用 BlockingCollection 而不是 Queue class。它们在 FIFO 方面的行为实际上相同,但 BlockingCollection 会让您的每个线程阻塞,直到可以通过调用 GetConsumingEnumerable 或 Take 获取一个项目。这是一个完整的例子。
http://mikehadlow.blogspot.com/2012/11/using-blockingcollection-to-communicate.html?m=1
至于包含参数,您似乎可以使用闭包来封闭 NetworkDevice 本身,然后将 Action 入队而不是 Action<>
我正在做一个使用 SNMP 映射网络(仅限路由器)的小项目。为了加快速度,除了由主线程完成的第一项工作外,我正在尝试让 池 线程负责完成我需要的工作。
此时我有两个作业,一个带参数,另一个不带参数:
UpdateDeviceInfo(NetworkDevice nd)
UpdateLinks()
*尚未定义
我想要实现的是让那些等待工作的工作线程
出现在 Queue<Action>
上并在它为空时等待。主线程将添加第一个作业,然后等待所有可能添加更多作业的工作人员完成,然后再开始添加第二个作业并唤醒休眠线程。
我的problem/questions是:
如何定义
Queue<Actions>
以便我可以插入方法和参数(如果有)。如果不可能,我可以让所有函数接受相同的参数。如何无限期地启动 working 线程。我不确定应该在哪里创建
for(;;)
.
到目前为止,这是我的代码:
public enum DatabaseState
{
Empty = 0,
Learning = 1,
Updating = 2,
Stable = 3,
Exiting = 4
};
public class NetworkDB
{
public Dictionary<string, NetworkDevice> database;
private Queue<Action<NetworkDevice>> jobs;
private string _community;
private string _ipaddress;
private Object _statelock = new Object();
private DatabaseState _state = DatabaseState.Empty;
private readonly int workers = 4;
private Object _threadswaitinglock = new Object();
private int _threadswaiting = 0;
public Dictionary<string, NetworkDevice> Database { get => database; set => database = value; }
public NetworkDB(string community, string ipaddress)
{
_community = community;
_ipaddress = ipaddress;
database = new Dictionary<string, NetworkDevice>();
jobs = new Queue<Action<NetworkDevice>>();
}
public void Start()
{
NetworkDevice nd = SNMP.GetDeviceInfo(new IpAddress(_ipaddress), _community);
if (nd.Status > NetworkDeviceStatus.Unknown)
{
database.Add(nd.Id, nd);
_state = DatabaseState.Learning;
nd.Update(this); // The first job is done by the main thread
for (int i = 0; i < workers; i++)
{
Thread t = new Thread(JobRemove);
t.Start();
}
lock (_statelock)
{
if (_state == DatabaseState.Learning)
{
Monitor.Wait(_statelock);
}
}
lock (_statelock)
{
if (_state == DatabaseState.Updating)
{
Monitor.Wait(_statelock);
}
}
foreach (KeyValuePair<string, NetworkDevice> n in database)
{
using (System.IO.StreamWriter file = new System.IO.StreamWriter(n.Value.Name + ".txt")
{
file.WriteLine(n);
}
}
}
}
public void JobInsert(Action<NetworkDevice> func, NetworkDevice nd)
{
lock (jobs)
{
jobs.Enqueue(item);
if (jobs.Count == 1)
{
// wake up any blocked dequeue
Monitor.Pulse(jobs);
}
}
}
public void JobRemove()
{
Action<NetworkDevice> item;
lock (jobs)
{
while (jobs.Count == 0)
{
lock (_threadswaitinglock)
{
_threadswaiting += 1;
if (_threadswaiting == workers)
Monitor.Pulse(_statelock);
}
Monitor.Wait(jobs);
}
lock (_threadswaitinglock)
{
_threadswaiting -= 1;
}
item = jobs.Dequeue();
item.Invoke();
}
}
public bool NetworkDeviceExists(NetworkDevice nd)
{
try
{
Monitor.Enter(database);
if (database.ContainsKey(nd.Id))
{
return true;
}
else
{
database.Add(nd.Id, nd);
Action<NetworkDevice> action = new Action<NetworkDevice>(UpdateDeviceInfo);
jobs.Enqueue(action);
return false;
}
}
finally
{
Monitor.Exit(database);
}
}
//Job1 - Learning -> Update device info
public void UpdateDeviceInfo(NetworkDevice nd)
{
nd.Update(this);
try
{
Monitor.Enter(database);
nd.Status = NetworkDeviceStatus.Self;
}
finally
{
Monitor.Exit(database);
}
}
//Job2 - Updating -> After Learning, create links between neighbours
private void UpdateLinks()
{
}
}
您最好的选择似乎是使用 BlockingCollection 而不是 Queue class。它们在 FIFO 方面的行为实际上相同,但 BlockingCollection 会让您的每个线程阻塞,直到可以通过调用 GetConsumingEnumerable 或 Take 获取一个项目。这是一个完整的例子。
http://mikehadlow.blogspot.com/2012/11/using-blockingcollection-to-communicate.html?m=1
至于包含参数,您似乎可以使用闭包来封闭 NetworkDevice 本身,然后将 Action 入队而不是 Action<>