.NET IBM MQ Listener 未确认消息并从队列开头读取
.NET IBM MQ Listener unacknowledged message and reading from the beginning of the queue
我有一个 C# 应用程序,它设置了许多 MQ 侦听器(多个线程和可能的多个服务器,每个服务器都有自己的侦听器)。有些消息会离开队列,我想留在队列中,继续处理 MQ 上的下一条消息,但在某些情况下,我会想回去重新阅读这些消息......
var connectionFactory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ).CreateConnectionFactory();
connectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, origination.Server);
connectionFactory.SetIntProperty(XMSC.WMQ_PORT, int.Parse(origination.Port));
connectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, origination.QueueManager);
connectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, origination.Channel);
var connection = connectionFactory.CreateConnection(null, null);
_connections.Add(connection);
var session = connection.CreateSession(false, AcknowledgeMode.ClientAcknowledge); //changed to use ClientAcknowledge so that we will leave the message on the MQ until we're sure we're processing it
_sessions.Add(session);
var destination = session.CreateQueue(origination.Queue);
_destinations.Add(destination);
var consumer = session.CreateConsumer(destination);
_consumers.Add(consumer);
Logging.LogDebugMessage(Constants.ListenerStart);
connection.Start();
ThreadPool.QueueUserWorkItem((o) => Receive(forOrigination, consumer));
那我有...
if (OnMQMessageReceived != null)
{
var message = consumer.Receive();
var identifier = string.Empty;
if (message is ITextMessage)
{
//do stuff with the message here
//populates identifier from the message
}
else
{
//do stuff with the message here
//populates identifier from the message
}
if (!string.IsNullOrWhiteSpace(identifier)&& OnMQMessageReceived != null)
{
if( some check to see if we should process the message now)
{
//process message here
message.Acknowledge(); //this really pulls it off of the MQ
//here is where I want to trigger the next read to be from the beginning of the MQ
}
else
{
//We actually want to do nothing here. As in do not do Acknowledge
//This leaves the message on the MQ and we'll pick it up again later
//But we want to move on to the next message in the MQ
}
}
else
{
message.Acknowledge(); //this really pulls it off of the MQ...its useless to us anyways
}
}
else
{
Thread.Sleep(0);
}
ThreadPool.QueueUserWorkItem((o) => Receive(forOrigination, consumer));
所以有几个问题:
如果我不确认消息它会留在 MQ 上,对吗?
如果消息未被确认,那么默认情况下,当我使用相同的侦听器再次从 MQ 读取时,它会读取下一个并且不会转到开头,对吗?
如何更改侦听器以便下次阅读时从队列的开头开始?
将消息留在队列中是一种反模式。如果您不想或不能在逻辑的某个点处理消息,那么您有多种选择:
- 将其从队列中取出并放入另一个 queue/topic 进行 delayed/different 处理。
- 将其从队列中取出并转储到数据库、平面文件 - 无论如何,如果您想在消息流之外处理它,或者根本不想处理。
- 如果可行,您可能希望更改消息生成器,使其不会将具有不同处理要求的消息混合在同一个 queue/topic。
无论如何,不要在队列中留言,总是前进到下一条留言。这将使应用程序的方式更可预测,更容易推理。您还将避免各种性能问题。如果您的应用程序对消息传递的顺序敏感或可能对消息传递的顺序敏感,那么手动确认所选消息也会与它不一致。
您的问题:
JMS 规范对未确认消息的行为含糊不清 - 它们可能会乱序传递,并且未定义传递它们的确切时间。此外,确认方法调用将确认所有以前收到和未确认的消息 - 可能不是您想要的。
如果您留下消息,听众可能会也可能不会立即返回。如果您重新启动它,它当然会重新开始,但是当它坐在那里等待消息时它取决于实现。
因此,如果您尝试让您的设计发挥作用,您可能会在某些情况下使其发挥作用,但它是不可预测的或不可靠的。
我有一个 C# 应用程序,它设置了许多 MQ 侦听器(多个线程和可能的多个服务器,每个服务器都有自己的侦听器)。有些消息会离开队列,我想留在队列中,继续处理 MQ 上的下一条消息,但在某些情况下,我会想回去重新阅读这些消息......
var connectionFactory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ).CreateConnectionFactory();
connectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, origination.Server);
connectionFactory.SetIntProperty(XMSC.WMQ_PORT, int.Parse(origination.Port));
connectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, origination.QueueManager);
connectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, origination.Channel);
var connection = connectionFactory.CreateConnection(null, null);
_connections.Add(connection);
var session = connection.CreateSession(false, AcknowledgeMode.ClientAcknowledge); //changed to use ClientAcknowledge so that we will leave the message on the MQ until we're sure we're processing it
_sessions.Add(session);
var destination = session.CreateQueue(origination.Queue);
_destinations.Add(destination);
var consumer = session.CreateConsumer(destination);
_consumers.Add(consumer);
Logging.LogDebugMessage(Constants.ListenerStart);
connection.Start();
ThreadPool.QueueUserWorkItem((o) => Receive(forOrigination, consumer));
那我有...
if (OnMQMessageReceived != null)
{
var message = consumer.Receive();
var identifier = string.Empty;
if (message is ITextMessage)
{
//do stuff with the message here
//populates identifier from the message
}
else
{
//do stuff with the message here
//populates identifier from the message
}
if (!string.IsNullOrWhiteSpace(identifier)&& OnMQMessageReceived != null)
{
if( some check to see if we should process the message now)
{
//process message here
message.Acknowledge(); //this really pulls it off of the MQ
//here is where I want to trigger the next read to be from the beginning of the MQ
}
else
{
//We actually want to do nothing here. As in do not do Acknowledge
//This leaves the message on the MQ and we'll pick it up again later
//But we want to move on to the next message in the MQ
}
}
else
{
message.Acknowledge(); //this really pulls it off of the MQ...its useless to us anyways
}
}
else
{
Thread.Sleep(0);
}
ThreadPool.QueueUserWorkItem((o) => Receive(forOrigination, consumer));
所以有几个问题:
如果我不确认消息它会留在 MQ 上,对吗?
如果消息未被确认,那么默认情况下,当我使用相同的侦听器再次从 MQ 读取时,它会读取下一个并且不会转到开头,对吗?
如何更改侦听器以便下次阅读时从队列的开头开始?
将消息留在队列中是一种反模式。如果您不想或不能在逻辑的某个点处理消息,那么您有多种选择:
- 将其从队列中取出并放入另一个 queue/topic 进行 delayed/different 处理。
- 将其从队列中取出并转储到数据库、平面文件 - 无论如何,如果您想在消息流之外处理它,或者根本不想处理。
- 如果可行,您可能希望更改消息生成器,使其不会将具有不同处理要求的消息混合在同一个 queue/topic。
无论如何,不要在队列中留言,总是前进到下一条留言。这将使应用程序的方式更可预测,更容易推理。您还将避免各种性能问题。如果您的应用程序对消息传递的顺序敏感或可能对消息传递的顺序敏感,那么手动确认所选消息也会与它不一致。
您的问题:
JMS 规范对未确认消息的行为含糊不清 - 它们可能会乱序传递,并且未定义传递它们的确切时间。此外,确认方法调用将确认所有以前收到和未确认的消息 - 可能不是您想要的。
如果您留下消息,听众可能会也可能不会立即返回。如果您重新启动它,它当然会重新开始,但是当它坐在那里等待消息时它取决于实现。
因此,如果您尝试让您的设计发挥作用,您可能会在某些情况下使其发挥作用,但它是不可预测的或不可靠的。