连接到多个队列并将消息路由到另一个队列的最有效方法
Most efficient way to connect to multiple queues and route their messages to another queue
我正在使用 IBM 的 Websphere MQ 队列,我想连接到其中的多个队列,浏览每个消息,然后将它们全部路由到另一个队列,不同的应用程序将从中读取消息。
从 "main" 队列中读取的应用程序是在 Java 中编写的,并使用 Spring Boot 和 JMS 来处理来自队列的消息。我发现很难用这个应用程序连接到多个队列,所以我的想法是创建一个路由器应用程序,它将连接到多个队列,清除 "main" 队列,并用收集到的队列中的消息填充它。
我也在 Java 中开始了 "router" 应用程序,并决定使用 Spring Boot,而不是 JMS,并且在概念上仍然发现一些困难。
所以我想知道 Java 是否适合这份工作(或者我可能遗漏了一些基本知识)。我也在考虑 Perl 脚本,但我对这门语言不是很了解(如果有的话),但我不想诋毁它作为一个可行的选择。
那么这两种语言中的哪一种可以证明是:
1) 可维护
2) 比较容易阅读
3) 高效
在完成我所描述的任务时?
两者各有利弊。
Java中最先进的EIP集成框架是Apache Camel。我建议使用它。使用 Apache Camel,您可以以类似于以下的形式编写路由器:
public class Routes extends RouteBuilder {
@Override
public void configure() throws Exception {
from("jms:InputOneQueue1").to("jms:destinationQueue");
from("jms:InputOneQueue2").to("jms:destinationQueue");
from("jms:InputOneQueue3").to("jms:destinationQueue");
}
}
Apache Camel 看起来很有前途,但与我设置 Spring Boot 应用程序的方式不太吻合。我最终使用了 IBM 的 MQ 库。
我基本上是在设置队列后使用这些选项
putMsgOpts = new MQPutMessageOptions();
getFirstMsgOpts = new MQGetMessageOptions();
getFirstMsgOpts.options = MQConstants.MQGMO_BROWSE_FIRST;
getNextMsgOpts = new MQGetMessageOptions();
getNextMsgOpts.options = MQConstants.MQGMO_BROWSE_NEXT;
那就用我的方法
public void transferQueue(MQPropsManager q1) {
String message = "";
message = readFromQueue1(q1, getFirstMsgOpts);
message = verifyMessage(message); // just a check for empty or null
writeToQueue2(message);
q1.decrementMessagesLeftToProcess(); // decrement initial queue depth
while (q1.getMessagesLeftToProcess() > 0) {
message = readFromQueue1(q1, getNextMsgOpts);
message = verifyMessage(message);
writeToQueue2(message);
q1.decrementMessagesLeftToProcess();
}
closeQueue(q1);
}
然后是它调用的两个方法:
public String readFromQueue1(MQPropsManager q1,
MQGetMessageOptions getMsgOpts) {
MQMessage msg = new MQMessage();
String message = "";
try {
q1.getQueue().get(msg, getMsgOpts);
message = msg.readStringOfCharLength(msg.getMessageLength());
} catch (IOException ioe) {
// Failed to read string retreived from queue: q1.getQueueName()
} catch (MQException mqe) {
// Failed to retreive message from queue: q1.getQueueName()
}
return message;
}
public void writeToQueue2(String message) {
MQMessage mqMessage = new MQMessage();
mqMessage.format = MQConstants.MQFMT_STRING;
mqMessage.messageType = MQConstants.MQMT_DATAGRAM;
try {
mqMessage.writeString(message);
q2.getQueue().put(mqMessage, putMsgOpts);
} catch (IOException ioe) {
// Failed to write message: message
} catch (MQException mqe) {
// Failed to put message: message on to the queue2
}
}
我正在使用 IBM 的 Websphere MQ 队列,我想连接到其中的多个队列,浏览每个消息,然后将它们全部路由到另一个队列,不同的应用程序将从中读取消息。
从 "main" 队列中读取的应用程序是在 Java 中编写的,并使用 Spring Boot 和 JMS 来处理来自队列的消息。我发现很难用这个应用程序连接到多个队列,所以我的想法是创建一个路由器应用程序,它将连接到多个队列,清除 "main" 队列,并用收集到的队列中的消息填充它。
我也在 Java 中开始了 "router" 应用程序,并决定使用 Spring Boot,而不是 JMS,并且在概念上仍然发现一些困难。
所以我想知道 Java 是否适合这份工作(或者我可能遗漏了一些基本知识)。我也在考虑 Perl 脚本,但我对这门语言不是很了解(如果有的话),但我不想诋毁它作为一个可行的选择。
那么这两种语言中的哪一种可以证明是:
1) 可维护
2) 比较容易阅读
3) 高效
在完成我所描述的任务时?
两者各有利弊。
Java中最先进的EIP集成框架是Apache Camel。我建议使用它。使用 Apache Camel,您可以以类似于以下的形式编写路由器:
public class Routes extends RouteBuilder {
@Override
public void configure() throws Exception {
from("jms:InputOneQueue1").to("jms:destinationQueue");
from("jms:InputOneQueue2").to("jms:destinationQueue");
from("jms:InputOneQueue3").to("jms:destinationQueue");
}
}
Apache Camel 看起来很有前途,但与我设置 Spring Boot 应用程序的方式不太吻合。我最终使用了 IBM 的 MQ 库。
我基本上是在设置队列后使用这些选项
putMsgOpts = new MQPutMessageOptions();
getFirstMsgOpts = new MQGetMessageOptions();
getFirstMsgOpts.options = MQConstants.MQGMO_BROWSE_FIRST;
getNextMsgOpts = new MQGetMessageOptions();
getNextMsgOpts.options = MQConstants.MQGMO_BROWSE_NEXT;
那就用我的方法
public void transferQueue(MQPropsManager q1) {
String message = "";
message = readFromQueue1(q1, getFirstMsgOpts);
message = verifyMessage(message); // just a check for empty or null
writeToQueue2(message);
q1.decrementMessagesLeftToProcess(); // decrement initial queue depth
while (q1.getMessagesLeftToProcess() > 0) {
message = readFromQueue1(q1, getNextMsgOpts);
message = verifyMessage(message);
writeToQueue2(message);
q1.decrementMessagesLeftToProcess();
}
closeQueue(q1);
}
然后是它调用的两个方法:
public String readFromQueue1(MQPropsManager q1,
MQGetMessageOptions getMsgOpts) {
MQMessage msg = new MQMessage();
String message = "";
try {
q1.getQueue().get(msg, getMsgOpts);
message = msg.readStringOfCharLength(msg.getMessageLength());
} catch (IOException ioe) {
// Failed to read string retreived from queue: q1.getQueueName()
} catch (MQException mqe) {
// Failed to retreive message from queue: q1.getQueueName()
}
return message;
}
public void writeToQueue2(String message) {
MQMessage mqMessage = new MQMessage();
mqMessage.format = MQConstants.MQFMT_STRING;
mqMessage.messageType = MQConstants.MQMT_DATAGRAM;
try {
mqMessage.writeString(message);
q2.getQueue().put(mqMessage, putMsgOpts);
} catch (IOException ioe) {
// Failed to write message: message
} catch (MQException mqe) {
// Failed to put message: message on to the queue2
}
}