多线程 Windows 服务处理 Windows 消息队列
Multi threaded Windows service to process Windows Message Queue
这是我第一次尝试编写 Windows 服务。
此 windows 服务必须处理 2 windows 个消息队列。
每个消息队列都应该有自己的线程,但我似乎无法获得适当的体系结构。
我遵循了这个 Windows Service to run constantly,它允许我创建一个线程,我在其中处理一个队列。
这就是我的服务 class:
protected override void OnStart(string[] args)
{
_thread = new Thread(WorkerThreadFunc) { Name = "Address Calculator Thread", IsBackground = true };
_thread.Start();
}
private void WorkerThreadFunc()
{
_addressCalculator = new GACAddressCalculator();
while (!_shutdownEvent.WaitOne(0))
{
_addressCalculator.StartAddressCalculation();
}
}
protected override void OnStop()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{ // give the thread 5 seconds to stop
_thread.Abort();
}
}
在我的 GACAddressCalculator.StartAddressCalculation()
中,我正在创建一个如下所示的队列处理器对象:
public void StartAddressCalculation()
{
try
{
var googleQueue = new GISGoogleQueue("VehMonLogGISGoogle", 1, _gacLogger, 1);
googleQueue.ProccessMessageQueue();
}
catch (Exception ex)
{
}
}
这是GISGoogleQueue
:
public class GISGoogleQueue : BaseMessageQueue
{
public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
: base(queueName, threadCount, logger, messagesPerThread)
{
}
public override void ProccessMessageQueue()
{
if (!MessageQueue.Exists(base.QueueName))
{
_logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
return;
}
var messageQueue = new MessageQueue(QueueName);
var myVehMonLog = new VehMonLog();
var o = new Object();
var arrTypes = new Type[2];
arrTypes[0] = myVehMonLog.GetType();
arrTypes[1] = o.GetType();
messageQueue.Formatter = new XmlMessageFormatter(arrTypes);
using (var pool = new Pool(ThreadCount))
{
// Infinite loop to process all messages in Queue
for (; ; )
{
for (var i = 0; i < MessagesPerThread; i++)
{
try
{
while (pool.TaskCount() >= MessagesPerThread) ; // Stop execution until Tasks in pool have been executed
var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
if (message != null) // Check if message is not Null
{
var monLog = (VehMonLog)message.Body;
pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
}
}
catch (Exception ex)
{
}
}
}
}
}
}
现在这适用于 1 个消息队列,但如果我想处理另一个消息队列,它不会发生,因为我在 ProccessMessageQueue
方法中有一个无限循环。
我想在单独的线程中执行每个队列。
我想我在 WorkerThreadFunc()
中犯了一个错误,我必须以某种方式从那里或 OnStart()
中启动两个线程。
此外,如果您有任何关于如何改进此服务的提示,那就太好了。
顺便说一下,我正在使用此答案 中的池 Class 作为线程池 ProccessMessageQueue
你可以这样使用Parallel.ForEach;
Parallel.ForEach(queueItems, ProcessQueue); //this will process each queue item in a separate thread
private void ProcessQueue(QueueItem queue)
{
//your processing logic
}
我建议更改您的服务 class 如下(下面的评论):
protected override void OnStart(string[] args)
{
_thread = new Thread(WorkerThreadFunc)
{
Name = "Run Constantly Thread",
IsBackground = true
};
_thread.Start();
}
GISGoogleQueue _googleQueue1;
GISGoogleQueue _googleQueue2;
private void WorkerThreadFunc()
{
// This thread is exclusively used to keep the service running.
// As such, there's no real need for a while loop here. Create
// the necessary objects, start them, wait for shutdown, and
// cleanup.
_googleQueue1 = new GISGoogleQueue(...);
_googleQueue1.Start();
_googleQueue2 = new GISGoogleQueue(...);
_googleQueue2.Start();
_shutdownEvent.WaitOne(); // infinite wait
_googleQueue1.Shutdown();
_googleQueue2.Shutdown();
}
protected override void OnStop()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{
// give the thread 5 seconds to stop
_thread.Abort();
}
}
我无视你的 GACAddressCalculator
。从您展示的内容来看,它似乎是 GISGoogleQueue
周围的薄包装。显然,如果它确实做了一些您没有展示的事情,则需要将其重新考虑在内。
请注意,在 WorkerThreadFunc()
中创建了两个 GISGoogleQueue
对象。那么接下来让我们看看如何创建那些对象来实现合适的线程模型。
public class GISGoogleQueue : BaseMessageQueue
{
System.Threading.Thread _thread;
System.Threading.ManualResetEvent _shutdownEvent;
public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
: base(queueName, threadCount, logger, messagesPerThread)
{
// Let this class wrap a thread object. Create it here.
_thread = new Thread(RunMessageQueueFunc()
{
Name = "Run Message Queue Thread " + Guid.NewGuid().ToString(),
IsBackground = true
};
_shutdownEvent = new ManualResetEvent(false);
}
public Start()
{
_thread.Start();
}
public Shutdown()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{
// give the thread 5 seconds to stop
_thread.Abort();
}
}
private void RunMessageQueueFunc()
{
if (!MessageQueue.Exists(base.QueueName))
{
_logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
return;
}
var messageQueue = new MessageQueue(QueueName);
var myVehMonLog = new VehMonLog();
var o = new Object();
var arrTypes = new Type[2];
arrTypes[0] = myVehMonLog.GetType();
arrTypes[1] = o.GetType();
messageQueue.Formatter = new XmlMessageFormatter(arrTypes);
using (var pool = new Pool(ThreadCount))
{
// Here's where we'll wait for the shutdown event to occur.
while (!_shutdownEvent.WaitOne(0))
{
for (var i = 0; i < MessagesPerThread; i++)
{
try
{
// Stop execution until Tasks in pool have been executed
while (pool.TaskCount() >= MessagesPerThread) ;
// TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0));
if (message != null) // Check if message is not Null
{
var monLog = (VehMonLog)message.Body;
pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
}
}
catch (Exception ex)
{
}
}
}
}
}
}
这种方法的核心是使用 GISGoogleQueue
class 包裹的 Thread
对象。对于您创建的每个 GISGoogleQueue
对象,您都会得到一个包装线程,一旦对 GISGoogleQueue
对象调用 Start()
,该线程就会完成工作。
几点。在 RunMessageQueueFunc()
中,您正在检查队列名称是否存在。如果没有,函数退出。 IF 如果发生这种情况,线程也会退出。关键是您可能希望在此过程中尽早进行检查。只是一个想法。
其次,请注意您的无限循环已被针对 _shutdownEvent
对象的检查所取代。这样,循环将在服务关闭时停止。为了及时性,您需要确保完整地通过循环不会花费太长时间。否则,您可能会在关闭 5 秒后终止线程。中止只是为了确保事情被拆除,但应尽可能避免。
我知道很多人会更喜欢使用 Task
class 来做这样的事情。您似乎在 RunMessageQueueFunc()
里面。但是对于在进程持续时间内 运行 的线程,我认为 Task
class 是错误的选择,因为它会占用线程池中的线程。对我来说,Thread
class 是为了什么而构建的。
HTH
这是我第一次尝试编写 Windows 服务。
此 windows 服务必须处理 2 windows 个消息队列。
每个消息队列都应该有自己的线程,但我似乎无法获得适当的体系结构。
我遵循了这个 Windows Service to run constantly,它允许我创建一个线程,我在其中处理一个队列。
这就是我的服务 class:
protected override void OnStart(string[] args)
{
_thread = new Thread(WorkerThreadFunc) { Name = "Address Calculator Thread", IsBackground = true };
_thread.Start();
}
private void WorkerThreadFunc()
{
_addressCalculator = new GACAddressCalculator();
while (!_shutdownEvent.WaitOne(0))
{
_addressCalculator.StartAddressCalculation();
}
}
protected override void OnStop()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{ // give the thread 5 seconds to stop
_thread.Abort();
}
}
在我的 GACAddressCalculator.StartAddressCalculation()
中,我正在创建一个如下所示的队列处理器对象:
public void StartAddressCalculation()
{
try
{
var googleQueue = new GISGoogleQueue("VehMonLogGISGoogle", 1, _gacLogger, 1);
googleQueue.ProccessMessageQueue();
}
catch (Exception ex)
{
}
}
这是GISGoogleQueue
:
public class GISGoogleQueue : BaseMessageQueue
{
public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
: base(queueName, threadCount, logger, messagesPerThread)
{
}
public override void ProccessMessageQueue()
{
if (!MessageQueue.Exists(base.QueueName))
{
_logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
return;
}
var messageQueue = new MessageQueue(QueueName);
var myVehMonLog = new VehMonLog();
var o = new Object();
var arrTypes = new Type[2];
arrTypes[0] = myVehMonLog.GetType();
arrTypes[1] = o.GetType();
messageQueue.Formatter = new XmlMessageFormatter(arrTypes);
using (var pool = new Pool(ThreadCount))
{
// Infinite loop to process all messages in Queue
for (; ; )
{
for (var i = 0; i < MessagesPerThread; i++)
{
try
{
while (pool.TaskCount() >= MessagesPerThread) ; // Stop execution until Tasks in pool have been executed
var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
if (message != null) // Check if message is not Null
{
var monLog = (VehMonLog)message.Body;
pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
}
}
catch (Exception ex)
{
}
}
}
}
}
}
现在这适用于 1 个消息队列,但如果我想处理另一个消息队列,它不会发生,因为我在 ProccessMessageQueue
方法中有一个无限循环。
我想在单独的线程中执行每个队列。
我想我在 WorkerThreadFunc()
中犯了一个错误,我必须以某种方式从那里或 OnStart()
中启动两个线程。
此外,如果您有任何关于如何改进此服务的提示,那就太好了。
顺便说一下,我正在使用此答案 中的池 Class 作为线程池 ProccessMessageQueue
你可以这样使用Parallel.ForEach;
Parallel.ForEach(queueItems, ProcessQueue); //this will process each queue item in a separate thread
private void ProcessQueue(QueueItem queue)
{
//your processing logic
}
我建议更改您的服务 class 如下(下面的评论):
protected override void OnStart(string[] args)
{
_thread = new Thread(WorkerThreadFunc)
{
Name = "Run Constantly Thread",
IsBackground = true
};
_thread.Start();
}
GISGoogleQueue _googleQueue1;
GISGoogleQueue _googleQueue2;
private void WorkerThreadFunc()
{
// This thread is exclusively used to keep the service running.
// As such, there's no real need for a while loop here. Create
// the necessary objects, start them, wait for shutdown, and
// cleanup.
_googleQueue1 = new GISGoogleQueue(...);
_googleQueue1.Start();
_googleQueue2 = new GISGoogleQueue(...);
_googleQueue2.Start();
_shutdownEvent.WaitOne(); // infinite wait
_googleQueue1.Shutdown();
_googleQueue2.Shutdown();
}
protected override void OnStop()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{
// give the thread 5 seconds to stop
_thread.Abort();
}
}
我无视你的 GACAddressCalculator
。从您展示的内容来看,它似乎是 GISGoogleQueue
周围的薄包装。显然,如果它确实做了一些您没有展示的事情,则需要将其重新考虑在内。
请注意,在 WorkerThreadFunc()
中创建了两个 GISGoogleQueue
对象。那么接下来让我们看看如何创建那些对象来实现合适的线程模型。
public class GISGoogleQueue : BaseMessageQueue
{
System.Threading.Thread _thread;
System.Threading.ManualResetEvent _shutdownEvent;
public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
: base(queueName, threadCount, logger, messagesPerThread)
{
// Let this class wrap a thread object. Create it here.
_thread = new Thread(RunMessageQueueFunc()
{
Name = "Run Message Queue Thread " + Guid.NewGuid().ToString(),
IsBackground = true
};
_shutdownEvent = new ManualResetEvent(false);
}
public Start()
{
_thread.Start();
}
public Shutdown()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{
// give the thread 5 seconds to stop
_thread.Abort();
}
}
private void RunMessageQueueFunc()
{
if (!MessageQueue.Exists(base.QueueName))
{
_logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
return;
}
var messageQueue = new MessageQueue(QueueName);
var myVehMonLog = new VehMonLog();
var o = new Object();
var arrTypes = new Type[2];
arrTypes[0] = myVehMonLog.GetType();
arrTypes[1] = o.GetType();
messageQueue.Formatter = new XmlMessageFormatter(arrTypes);
using (var pool = new Pool(ThreadCount))
{
// Here's where we'll wait for the shutdown event to occur.
while (!_shutdownEvent.WaitOne(0))
{
for (var i = 0; i < MessagesPerThread; i++)
{
try
{
// Stop execution until Tasks in pool have been executed
while (pool.TaskCount() >= MessagesPerThread) ;
// TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0));
if (message != null) // Check if message is not Null
{
var monLog = (VehMonLog)message.Body;
pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
}
}
catch (Exception ex)
{
}
}
}
}
}
}
这种方法的核心是使用 GISGoogleQueue
class 包裹的 Thread
对象。对于您创建的每个 GISGoogleQueue
对象,您都会得到一个包装线程,一旦对 GISGoogleQueue
对象调用 Start()
,该线程就会完成工作。
几点。在 RunMessageQueueFunc()
中,您正在检查队列名称是否存在。如果没有,函数退出。 IF 如果发生这种情况,线程也会退出。关键是您可能希望在此过程中尽早进行检查。只是一个想法。
其次,请注意您的无限循环已被针对 _shutdownEvent
对象的检查所取代。这样,循环将在服务关闭时停止。为了及时性,您需要确保完整地通过循环不会花费太长时间。否则,您可能会在关闭 5 秒后终止线程。中止只是为了确保事情被拆除,但应尽可能避免。
我知道很多人会更喜欢使用 Task
class 来做这样的事情。您似乎在 RunMessageQueueFunc()
里面。但是对于在进程持续时间内 运行 的线程,我认为 Task
class 是错误的选择,因为它会占用线程池中的线程。对我来说,Thread
class 是为了什么而构建的。
HTH