连接到多个队列并将消息路由到另一个队列的最有效方法

Most efficient way to connect to multiple queues and route their messages to another queue

我正在使用 IBM 的 Websphere MQ 队列,我想连接到其中的多个队列,浏览每个消息,然后将它们全部路由到另一个队列,不同的应用程序将从中读取消息。

从 "main" 队列中读取的应用程序是在 Java 中编写的,并使用 Spring Boot 和 JMS 来处理来自队列的消息。我发现很难用这个应用程序连接到多个队列,所以我的想法是创建一个路由器应用程序,它将连接到多个队列,清除 "main" 队列,并用收集到的队列中的消息填充它。

我也在 Java 中开始了 "router" 应用程序,并决定使用 Spring Boot,而不是 JMS,并且在概念上仍然发现一些困难。

所以我想知道 Java 是否适合这份工作(或者我可能遗漏了一些基本知识)。我也在考虑 Perl 脚本,但我对这门语言不是很了解(如果有的话),但我不想诋毁它作为一个可行的选择。

那么这两种语言中的哪一种可以证明是:

1) 可维护

2) 比较容易阅读

3) 高效

在完成我所描述的任务时?

两者各有利弊。

Java中最先进的EIP集成框架是Apache Camel。我建议使用它。使用 Apache Camel,您可以以类似于以下的形式编写路由器:

public class Routes extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("jms:InputOneQueue1").to("jms:destinationQueue");
        from("jms:InputOneQueue2").to("jms:destinationQueue");
        from("jms:InputOneQueue3").to("jms:destinationQueue");
    }
}

Apache Camel 看起来很有前途,但与我设置 Spring Boot 应用程序的方式不太吻合。我最终使用了 IBM 的 MQ 库。

我基本上是在设置队列后使用这些选项

    putMsgOpts = new MQPutMessageOptions();

    getFirstMsgOpts = new MQGetMessageOptions();
    getFirstMsgOpts.options = MQConstants.MQGMO_BROWSE_FIRST;

    getNextMsgOpts = new MQGetMessageOptions();
    getNextMsgOpts.options = MQConstants.MQGMO_BROWSE_NEXT;

那就用我的方法

public void transferQueue(MQPropsManager q1) {
    String message = "";
    message = readFromQueue1(q1, getFirstMsgOpts);
    message = verifyMessage(message);   // just a check for empty or null
    writeToQueue2(message);
    q1.decrementMessagesLeftToProcess();  // decrement initial queue depth 

    while (q1.getMessagesLeftToProcess() > 0) {
        message = readFromQueue1(q1, getNextMsgOpts);
        message = verifyMessage(message);
        writeToQueue2(message);
        q1.decrementMessagesLeftToProcess();
    }
    closeQueue(q1);
}

然后是它调用的两个方法:

public String readFromQueue1(MQPropsManager q1,
            MQGetMessageOptions getMsgOpts) {
        MQMessage msg = new MQMessage();
        String message = "";
        try {
            q1.getQueue().get(msg, getMsgOpts);
            message = msg.readStringOfCharLength(msg.getMessageLength());
        } catch (IOException ioe) {
        //  Failed to read string retreived from queue: q1.getQueueName()
        } catch (MQException mqe) {
        //  Failed to retreive message from queue: q1.getQueueName()
        }
        return message;
    }

public void writeToQueue2(String message) {
        MQMessage mqMessage = new MQMessage();
        mqMessage.format = MQConstants.MQFMT_STRING;
        mqMessage.messageType = MQConstants.MQMT_DATAGRAM;

        try {
            mqMessage.writeString(message);
            q2.getQueue().put(mqMessage, putMsgOpts);
        } catch (IOException ioe) {
        //  Failed to write message: message 
        } catch (MQException mqe) {
        //  Failed to put message: message on to the queue2
        }
    }