WMQ消息组并行执行
WMQ Message Group Parallel execution
我有一个需求,当消费者宕机时在队列中存储大量消息,并在消费者恢复时按顺序处理它们。我想将逻辑上相关的消息组合在一起并并行执行所有组。
例如:考虑下面有 3 个组 A、B、C 的队列。我已将 seqnum 分配给内容,并将唯一的 groupId 正确分配给组。
队列 1
A1,A2,A3-last,B1,B2-last,C1,C2,c3,C4-last
- 是否可以并行获取A1、B1、C1?
- 此外,目前消费者正在按顺序正确获取 A1、A2 和 A3。但是它无法从 B 组或 C 组中获取任何内容。可能是哪里出了问题?
任何建议都会有很大帮助。
首先阅读来自 MQ 知识的 page。
我将伪代码(从上面的网页)转换为 MQ 类 for Java,并将其从浏览更改为破坏性获取。
此外,我更喜欢在同步点下处理每组消息(假设组的大小合理)。
MQGetMessageOptions gmo = new MQGetMessageOptions();
MQMessage rcvMsg = new MQMessage();
/* Get the first message in a group, or a message not in a group */
gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_ALL_MSGS_AVAILABLE | CMQC.MQGMO_WAIT | CMQC.MQGMO_SYNCPOINT;
gmo.MatchOptions = CMQC.MQMO_MATCH_MSG_SEQ_NUMBER;
rcvMsg.messageSequenceNumber = 1;
rcvMsg.groupId = CMQC.MQGI_NONE;
inQ.get(rcvMsg, gmo);
/* Examine first or only message */
...
gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_SYNCPOINT;
do while ((rcvMsg.messageFlags & CMQC.MQMF_MSG_IN_GROUP) == CMQC.MQMF_MSG_IN_GROUP)
{
rcvMsg.clearMessage();
inQ.get(rcvMsg, gmo);
/* Examine each remaining message in the group */
...
}
qMgr.commit();
必须完成一些编码工作:(1) 使用正确的选项和 (2) 在获得下一个组之前清除 groupId 字段。
我有一个需求,当消费者宕机时在队列中存储大量消息,并在消费者恢复时按顺序处理它们。我想将逻辑上相关的消息组合在一起并并行执行所有组。
例如:考虑下面有 3 个组 A、B、C 的队列。我已将 seqnum 分配给内容,并将唯一的 groupId 正确分配给组。 队列 1 A1,A2,A3-last,B1,B2-last,C1,C2,c3,C4-last
- 是否可以并行获取A1、B1、C1?
- 此外,目前消费者正在按顺序正确获取 A1、A2 和 A3。但是它无法从 B 组或 C 组中获取任何内容。可能是哪里出了问题?
任何建议都会有很大帮助。
首先阅读来自 MQ 知识的 page。
我将伪代码(从上面的网页)转换为 MQ 类 for Java,并将其从浏览更改为破坏性获取。
此外,我更喜欢在同步点下处理每组消息(假设组的大小合理)。
MQGetMessageOptions gmo = new MQGetMessageOptions();
MQMessage rcvMsg = new MQMessage();
/* Get the first message in a group, or a message not in a group */
gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_ALL_MSGS_AVAILABLE | CMQC.MQGMO_WAIT | CMQC.MQGMO_SYNCPOINT;
gmo.MatchOptions = CMQC.MQMO_MATCH_MSG_SEQ_NUMBER;
rcvMsg.messageSequenceNumber = 1;
rcvMsg.groupId = CMQC.MQGI_NONE;
inQ.get(rcvMsg, gmo);
/* Examine first or only message */
...
gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_SYNCPOINT;
do while ((rcvMsg.messageFlags & CMQC.MQMF_MSG_IN_GROUP) == CMQC.MQMF_MSG_IN_GROUP)
{
rcvMsg.clearMessage();
inQ.get(rcvMsg, gmo);
/* Examine each remaining message in the group */
...
}
qMgr.commit();
必须完成一些编码工作:(1) 使用正确的选项和 (2) 在获得下一个组之前清除 groupId 字段。