JMSCode使用Database一一收发消息
JMSCode to send and receive messages one by one using Database
在我去了多个站点并学习了 JMS 之后,我编写了一个 JMS 独立客户端来从数据库中读取消息并一条一条地发送它们。我也想一条一条地接收消息,然后更新数据库。
我需要使用标准 JMS 将消息发送到队列和其他应用程序,该应用程序将使用 TextMessage
并且其正文将被读取为 ISO-8859-1 字符串。他们同样会以 TextMessage
的形式发送回复。
我写了一个 for
循环来从数据库中一条一条地读取消息。
我是 JMS 的新手,请您指正我的以下代码是否可以正常读取消息并将消息发送到队列以及接收和更新数据库。 JMS 类型中是否需要更改任何内容或需要更正的任何内容。 for
循环工作正常吗?
/*MQ Configuration*/
MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
mqQueueConnectionFactory.setHostName(url);
mqQueueConnectionFactory.setChannel(channel);//communications link
mqQueueConnectionFactory.setPort(port);
mqQueueConnectionFactory.setQueueManager(qmgr);//service provider
mqQueueConnectionFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
/*Create Connection */
QueueConnection queueConnection = mqQueueConnectionFactory.createQueueConnection();
queueConnection.start();
/*Create session */
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
/*Create response queue */
// Queue queue = queueSession.createQueue("QUEUE.RESPONSE");
int messageCount = 0;
Queue queue = queueSession.createQueue(replytoQueueName);
QueueSender queueSender = null;
QueueReceiver queueReceiver=null;
for (Testbean testBean : testbeanList) {
String testMessage = testBean.getMessage();
/*Create text message */
textMessage = queueSession.createTextMessage(testMessage);
logger.info("Text messages sent: " + messageCount);
textMessage.setJMSReplyTo(queue);
textMessage.setJMSType("mcd://xmlns");//message type
textMessage.setJMSExpiration(2*1000);//message expiration
textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT); //message delivery mode either persistent or non-persistemnt
/*Create sender queue */
// QueueSender queueSender = queueSession.createSender(queueSession.createQueue("QUEUE.REQEST"));
queueSender = queueSession.createSender(queueSession.createQueue(outputQName));
queueSender.setTimeToLive(2*1000);
queueSender.send(textMessage);
/*After sending a message we get message id */
System.out.println("after sending a message we get message id "+ textMessage.getJMSMessageID());
String jmsCorrelationID = " JMSCorrelationID = '" + textMessage.getJMSMessageID() + "'";
/*Within the session we have to create queue reciver */
queueReceiver = queueSession.createReceiver(queue,jmsCorrelationID);
/*Receive the message from*/
Message message = queueReceiver.receive(60*1000);
// String responseMsg = ((TextMessage) message).getText();
byte[] by = ((TextMessage) message).getText().getBytes("ISO-8859-1");
logger.info(new String(by));
String responseMsg = new String(by,"UTF-8");
testDAO rmdao = new testDAO();
rmdao.updateTest(responseMsg, jmsCorrelationID);
messageCount += 1;
}
queueSender.close();
queueReceiver.close();
queueSession.close();
queueConnection.close();
两件事:
- 我会创建你的
QueueSender
和 Queue
反对它发送消息到 外部 for
循环,因为它们没有出现正在改变。
- 如果没有相应的消费者代码,最终不可能判断选择器是否有效,但是 not 在您发送的消息上调用
setCorrelationID()
看起来有点奇怪大部头书。使用提供者分配的消息 ID 可能是 IBM MQ request/reply 应用程序的常见模式,但使用相关 ID 的一般模式是对发送的消息调用 setJMSCorrelationID()
。这使得代码更加清晰,也允许应用程序直接控制关联 ID 的唯一性。这对于应用程序的可移植性可能很重要(例如,如果您从 IBM MQ 迁移到不同的 JMS 提供者),因为不同的 JMS 提供者使用 styles/formats 特定于其特定实现的消息 ID。此外,关于 JMS 规范规定的消息 ID,"The exact scope of uniqueness is provider defined," 在我看来不足以保证唯一性,尤其是在使用 java.util.UUID.randomUUID().toString()
之类的东西时是如此简单。
- 您应该确保为 JMS 和数据库工作使用 XA 事务,以便它们是原子的。
- 在
finally
块中关闭您的 JMS 资源。
在我去了多个站点并学习了 JMS 之后,我编写了一个 JMS 独立客户端来从数据库中读取消息并一条一条地发送它们。我也想一条一条地接收消息,然后更新数据库。
我需要使用标准 JMS 将消息发送到队列和其他应用程序,该应用程序将使用 TextMessage
并且其正文将被读取为 ISO-8859-1 字符串。他们同样会以 TextMessage
的形式发送回复。
我写了一个 for
循环来从数据库中一条一条地读取消息。
我是 JMS 的新手,请您指正我的以下代码是否可以正常读取消息并将消息发送到队列以及接收和更新数据库。 JMS 类型中是否需要更改任何内容或需要更正的任何内容。 for
循环工作正常吗?
/*MQ Configuration*/
MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
mqQueueConnectionFactory.setHostName(url);
mqQueueConnectionFactory.setChannel(channel);//communications link
mqQueueConnectionFactory.setPort(port);
mqQueueConnectionFactory.setQueueManager(qmgr);//service provider
mqQueueConnectionFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
/*Create Connection */
QueueConnection queueConnection = mqQueueConnectionFactory.createQueueConnection();
queueConnection.start();
/*Create session */
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
/*Create response queue */
// Queue queue = queueSession.createQueue("QUEUE.RESPONSE");
int messageCount = 0;
Queue queue = queueSession.createQueue(replytoQueueName);
QueueSender queueSender = null;
QueueReceiver queueReceiver=null;
for (Testbean testBean : testbeanList) {
String testMessage = testBean.getMessage();
/*Create text message */
textMessage = queueSession.createTextMessage(testMessage);
logger.info("Text messages sent: " + messageCount);
textMessage.setJMSReplyTo(queue);
textMessage.setJMSType("mcd://xmlns");//message type
textMessage.setJMSExpiration(2*1000);//message expiration
textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT); //message delivery mode either persistent or non-persistemnt
/*Create sender queue */
// QueueSender queueSender = queueSession.createSender(queueSession.createQueue("QUEUE.REQEST"));
queueSender = queueSession.createSender(queueSession.createQueue(outputQName));
queueSender.setTimeToLive(2*1000);
queueSender.send(textMessage);
/*After sending a message we get message id */
System.out.println("after sending a message we get message id "+ textMessage.getJMSMessageID());
String jmsCorrelationID = " JMSCorrelationID = '" + textMessage.getJMSMessageID() + "'";
/*Within the session we have to create queue reciver */
queueReceiver = queueSession.createReceiver(queue,jmsCorrelationID);
/*Receive the message from*/
Message message = queueReceiver.receive(60*1000);
// String responseMsg = ((TextMessage) message).getText();
byte[] by = ((TextMessage) message).getText().getBytes("ISO-8859-1");
logger.info(new String(by));
String responseMsg = new String(by,"UTF-8");
testDAO rmdao = new testDAO();
rmdao.updateTest(responseMsg, jmsCorrelationID);
messageCount += 1;
}
queueSender.close();
queueReceiver.close();
queueSession.close();
queueConnection.close();
两件事:
- 我会创建你的
QueueSender
和Queue
反对它发送消息到 外部for
循环,因为它们没有出现正在改变。 - 如果没有相应的消费者代码,最终不可能判断选择器是否有效,但是 not 在您发送的消息上调用
setCorrelationID()
看起来有点奇怪大部头书。使用提供者分配的消息 ID 可能是 IBM MQ request/reply 应用程序的常见模式,但使用相关 ID 的一般模式是对发送的消息调用setJMSCorrelationID()
。这使得代码更加清晰,也允许应用程序直接控制关联 ID 的唯一性。这对于应用程序的可移植性可能很重要(例如,如果您从 IBM MQ 迁移到不同的 JMS 提供者),因为不同的 JMS 提供者使用 styles/formats 特定于其特定实现的消息 ID。此外,关于 JMS 规范规定的消息 ID,"The exact scope of uniqueness is provider defined," 在我看来不足以保证唯一性,尤其是在使用java.util.UUID.randomUUID().toString()
之类的东西时是如此简单。 - 您应该确保为 JMS 和数据库工作使用 XA 事务,以便它们是原子的。
- 在
finally
块中关闭您的 JMS 资源。