从 Java 发出读取来自 MQ 的所有消息
Issue reading all the messages from MQ from Java
我开发了一个 Java 应用程序来读取来自 MQ 的消息。 Java 应用程序必须从 MQ 读取所有消息,将它们放入列表和 return 列表中。我正在使用 while
循环一条一条地读取消息,如果我捕获到 2033 异常和 return 列表,则会中断。
我的问题是我在阅读单条消息后收到 2033 异常。例如,我已将大约 10 条消息推送到队列和 运行 我的应用程序,第一个循环读取第一条消息并将其放入列表中。但是在第二个循环中,它在获取第二条消息时抛出 2033 异常。然后我需要 运行 应用程序来读取第二条消息,同样的事情发生了。
由于我是 MQ 的新手,我找不到导致它发生的原因。我正在使用 Java8 和 IBM MQ 核心库。下面是我的代码..
package com.reciever.mq;
import com.ibm.mq.*;
import com.ibm.mq.constants.MQConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class MQReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MQReceiver.class);
private static int GET_OPTIONS_CONSTANT = MQConstants.MQGMO_WAIT |
MQConstants.MQGMO_PROPERTIES_COMPATIBILITY |
MQConstants.MQGMO_ALL_SEGMENTS_AVAILABLE |
MQConstants.MQGMO_COMPLETE_MSG |
MQConstants.MQGMO_ALL_MSGS_AVAILABLE |
MQConstants.MQGMO_SYNCPOINT;
public static MQQueueManager queueManager;
public static void main(String[] args) throws MQException {
Hashtable<String, Object> mqOptions = new Hashtable<>();
mqOptions.put(MQConstants.HOST_NAME_PROPERTY, "host.name"); //just a placeHolder
mqOptions.put(MQConstants.CHANNEL_PROPERTY, "channelName");
mqOptions.put(MQConstants.USER_ID_PROPERTY, "userName");
mqOptions.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);
mqOptions.put(MQConstants.PORT_PROPERTY, 1980);
queueManager = new MQQueueManager("queueManager", mqOptions);
int options = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT
| MQConstants.MQOO_FAIL_IF_QUIESCING | MQConstants.MQOO_PASS_ALL_CONTEXT | MQConstants.MQOO_INQUIRE;
MQQueue mq = queueManager.accessQueue("queueNametoRead", options);
MQException.logExclude(2033);
MQMessage mqMessage = new MQMessage();
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.options = GET_OPTIONS_CONSTANT;
getOptions.waitInterval = 1;
//byte[] buffer = null;
List<byte[]> msgList = new ArrayList<>();
System.out.println("Current depth : " + mq.getCurrentDepth());
boolean hasMsges = true;
while(hasMsges){
byte[] buffer = null;
try{
mq.get(mqMessage, getOptions);
buffer = new byte[mqMessage.getDataLength()];
mqMessage.readFully(buffer);
queueManager.commit();
System.out.println("Recieved Message....");
msgList.add(buffer);
} catch (MQException e) {
if((e.completionCode == MQConstants.MQCC_FAILED) &&
(e.reasonCode == MQConstants.MQRC_NO_MSG_AVAILABLE)){
System.out.println(" No more messages.. : " + e.getReason());
break;
}else if (e.getReason() != 2033) {
LOGGER.error("Error getting message from the queue: " + "", e);
}
} catch (IOException e) {
LOGGER.error("Error reading the message from the queue: " + "", e);
}
/* finally {
if(mq != null) mq.close();
if(queueManager != null) queueManager.disconnect();
}*/
}
System.out.println("Final List size : " + msgList.size());
}
}
下面是我在运行完成程序
后的系统输出
Current depth : 9
Recieved Message...
No more messages.. : 2033
Final List size : 1
从上面的输出来看,它打印的当前深度是9,但是在第二次循环的时候就爆发了。
消息 ID 和相关 ID 字段在从队列中获取消息时填充。如果您使用相同的 MQMessage
来获取下一条消息,那么您必须先重置这两个字段。所以就在调用 get
方法之前添加这两行。
mqMessage.messageId = MQConstants.MQMI_NONE;
mqMessage.correlationId = MQConstants.MQCI_NONE;
您还可以在循环内移动 MQMessage 对象的创建
while(hasMsges){
byte[] buffer = null;
try{
MQMessage mqMessage = new mqMessage();
mq.get(mqMessage, getOptions);
这也有助于解决问题
我开发了一个 Java 应用程序来读取来自 MQ 的消息。 Java 应用程序必须从 MQ 读取所有消息,将它们放入列表和 return 列表中。我正在使用 while
循环一条一条地读取消息,如果我捕获到 2033 异常和 return 列表,则会中断。
我的问题是我在阅读单条消息后收到 2033 异常。例如,我已将大约 10 条消息推送到队列和 运行 我的应用程序,第一个循环读取第一条消息并将其放入列表中。但是在第二个循环中,它在获取第二条消息时抛出 2033 异常。然后我需要 运行 应用程序来读取第二条消息,同样的事情发生了。
由于我是 MQ 的新手,我找不到导致它发生的原因。我正在使用 Java8 和 IBM MQ 核心库。下面是我的代码..
package com.reciever.mq;
import com.ibm.mq.*;
import com.ibm.mq.constants.MQConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class MQReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MQReceiver.class);
private static int GET_OPTIONS_CONSTANT = MQConstants.MQGMO_WAIT |
MQConstants.MQGMO_PROPERTIES_COMPATIBILITY |
MQConstants.MQGMO_ALL_SEGMENTS_AVAILABLE |
MQConstants.MQGMO_COMPLETE_MSG |
MQConstants.MQGMO_ALL_MSGS_AVAILABLE |
MQConstants.MQGMO_SYNCPOINT;
public static MQQueueManager queueManager;
public static void main(String[] args) throws MQException {
Hashtable<String, Object> mqOptions = new Hashtable<>();
mqOptions.put(MQConstants.HOST_NAME_PROPERTY, "host.name"); //just a placeHolder
mqOptions.put(MQConstants.CHANNEL_PROPERTY, "channelName");
mqOptions.put(MQConstants.USER_ID_PROPERTY, "userName");
mqOptions.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);
mqOptions.put(MQConstants.PORT_PROPERTY, 1980);
queueManager = new MQQueueManager("queueManager", mqOptions);
int options = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT
| MQConstants.MQOO_FAIL_IF_QUIESCING | MQConstants.MQOO_PASS_ALL_CONTEXT | MQConstants.MQOO_INQUIRE;
MQQueue mq = queueManager.accessQueue("queueNametoRead", options);
MQException.logExclude(2033);
MQMessage mqMessage = new MQMessage();
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.options = GET_OPTIONS_CONSTANT;
getOptions.waitInterval = 1;
//byte[] buffer = null;
List<byte[]> msgList = new ArrayList<>();
System.out.println("Current depth : " + mq.getCurrentDepth());
boolean hasMsges = true;
while(hasMsges){
byte[] buffer = null;
try{
mq.get(mqMessage, getOptions);
buffer = new byte[mqMessage.getDataLength()];
mqMessage.readFully(buffer);
queueManager.commit();
System.out.println("Recieved Message....");
msgList.add(buffer);
} catch (MQException e) {
if((e.completionCode == MQConstants.MQCC_FAILED) &&
(e.reasonCode == MQConstants.MQRC_NO_MSG_AVAILABLE)){
System.out.println(" No more messages.. : " + e.getReason());
break;
}else if (e.getReason() != 2033) {
LOGGER.error("Error getting message from the queue: " + "", e);
}
} catch (IOException e) {
LOGGER.error("Error reading the message from the queue: " + "", e);
}
/* finally {
if(mq != null) mq.close();
if(queueManager != null) queueManager.disconnect();
}*/
}
System.out.println("Final List size : " + msgList.size());
}
}
下面是我在运行完成程序
后的系统输出Current depth : 9
Recieved Message...
No more messages.. : 2033
Final List size : 1
从上面的输出来看,它打印的当前深度是9,但是在第二次循环的时候就爆发了。
消息 ID 和相关 ID 字段在从队列中获取消息时填充。如果您使用相同的 MQMessage
来获取下一条消息,那么您必须先重置这两个字段。所以就在调用 get
方法之前添加这两行。
mqMessage.messageId = MQConstants.MQMI_NONE;
mqMessage.correlationId = MQConstants.MQCI_NONE;
您还可以在循环内移动 MQMessage 对象的创建
while(hasMsges){
byte[] buffer = null;
try{
MQMessage mqMessage = new mqMessage();
mq.get(mqMessage, getOptions);
这也有助于解决问题