初始化异步 JMS 侦听器并让它无限 运行 的正确方法

Correct approach to initialize an asynchronous JMS Listener and let it run infinitely

我使用以下应用程序连接到我的 oracle 数据库,注册一个队列侦听器并等待排队的消息:

package sample;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class MyThread extends Thread {

    private static final String QUEUE_NAME = "MY_QUEUE";
    private static final String QUEUE_USER = "myuser";
    private static final String QUEUE_PW = "mypassword";
    private boolean running;

    public MyThread() {
        this.running = true;
    }

    public static void main(String[] args) {
        MyThread mt = new MyThread();
        mt.start();
    }

    private QueueConnection getQueueConnection() throws JMSException {
        QueueConnectionFactory QFac = AQjmsFactory.getQueueConnectionFactory("xxx.xxx.xxx.xxx", "orcl", 1521, "thin");
        QueueConnection QCon = QFac.createQueueConnection(QUEUE_USER, QUEUE_PW);
        return QCon;
    }

    @Override
    public void interrupt() {
        this.running = false;
        super.interrupt();
    }

    @Override
    public void run() {
        try {
            QueueConnection queueConnection = getQueueConnection();
            QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = ((AQjmsSession) queueSession).getQueue(QUEUE_USER, QUEUE_NAME);

            while (running) {
                System.out.println("Starting...");

                queueConnection.start();
                MessageConsumer mq = ((AQjmsSession) queueSession).createReceiver(queue);
                MyListener listener = new MyListener();
                mq.setMessageListener(listener);

                System.out.println("... Done, now sleep a bit and redo");

                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("Closing Application");
            queueSession.close();
            queueConnection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消息入队后,onMessage 函数会将消息附加到文本文件内容中:

package sample;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.Message;
import javax.jms.MessageListener;

public class MyListener implements MessageListener{

    @Override
    public void onMessage(Message arg0) {
        try {
            PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter("C:/temp/output/messages.txt", true)));
            out.println("New Message arrived");
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在运行时,我的控制台输出如下所示:

Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo

如果有任何排队文件将包含新消息。
所以:我可以使事件出队,并且 onMessage 事件将被这段代码触发。

现在回答我的问题:我很确定等待 5 秒再次注册监听器(并调用 queueConnection.start() 来接收 onMessage 调用)是不是正确的方法。但如果我不这样做,就不会有任何 onMessage 事件(文件保持为空)。

在没有固定 Thread.sleep() 调用且无需再次注册侦听器(即使没有任何事件)的情况下开始无限侦听队列的正确方法是什么?

附加信息

数据库:Oracle 11g2
Java 运行时间:1.6
Maven 依赖项:

没有理由 运行 线程创建 JMS 使用者并设置其消息侦听器。 JMS 消息侦听器的全部要点是异步接收消息(您似乎出于某种原因试图复制的功能)。

您只需创建 JMS 消费者并设置消息侦听器,然后确保消费者未关闭。根据应用程序的编写方式,有时需要 while 循环以确保程序不会终止并因此关闭使用者。你的线程没有这样做。它让消费者在等待消息 5 秒后超出范围,这意味着它将被垃圾收集,我希望对于大多数 JMS 实现来说这意味着它将被关闭。不过,情况可能比这更糟。通过不显式关闭消费者并让它超出范围,您可能会泄漏消费者,最终会使您的消息代理陷入困境。这不仅是一种草率的编程,而且对于其他试图使用消息的用户来说可能存在问题。