收到消息后关闭 JMS 会话和连接

Close JMS session and connection after message received

我有一个有状态会话 bean,我可以在其中发送和接收 JMS 消息。所有连接设置都是手动处理的,因此 bean 持有 javax.jms.connection 和 javax.jms.session 的实例。该 bean 还实现了 MessageListener 以便能够接收消息。

现在,当我发送消息时,我使用 session.createTemporaryQueue() 创建了一个临时队列。我将 message.setJMSReplyTo() 设置为同一个临时队列,最后创建该队列的使用者并将 MessageListener 设置为实现所有这些的同一个有状态会话 bean。

我很乐意将消息发送到 onMessage() 方法。但是,我想在收到消息后立即关闭会话和连接,这在 onMessage() 方法中显然是不允许的。

所以问题是: 收到消息后如何关闭会话和连接?我必须手动处理连接设置,不能使用 MDB。

注意: 这是在 Java EE 环境 (GlassFish 4.0)

中执行的

编辑:

import javax.ejb.LocalBean;
import javax.ejb.Stateful;
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;

@LocalBean
@Stateful
public class OpenMqClient implements MessageListener{
    private Connection connection;
    private Session session;
    private MessageConsumer responseConsumer;

    public OpenMqClient(){}

    public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
        try{
            String host = System.getProperty("foo", jmsBrokerUri);
            QueueConnectionFactory cf = new QueueConnectionFactory();
            cf.setProperty(ConnectionConfiguration.imqAddressList, host);
            connection = null;
            session = null;

            //Setup connection
            connection = cf.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //Setup queue and producer
            Queue queue = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(queue);


            //Reply destination
            Queue responseQueue = session.createTemporaryQueue();
            responseConsumer = session.createConsumer(responseQueue);
            responseConsumer.setMessageListener(this);

            //Create message
            TextMessage textMessage = session.createTextMessage();
            textMessage.setJMSReplyTo(responseQueue);
            textMessage.setJMSCorrelationID("test0101");
            textMessage.setText(messageContent);

            producer.send(textMessage);
            System.out.println("Message sent");
        } catch (JMSException e) {
            e.printStackTrace();
            System.out.println("JMSException in Sender");
        }
    }

    @Override
    public void onMessage(Message arg0) {
        //On this event I want to close the session and connection, but it's not permitted
    }

}

就我个人而言,这就是我的做法(请注意,我没有测试或向这段代码添加太多错误处理)。

  1. 将连接设置为静态 - 您可以(可能应该)为所有 bean 重用相同的连接,除非您有特殊原因不这样做
  2. 在新线程中关闭会话

    public class OpenMqClient implements MessageListener {
    
        private static Connection connection;
        private static final String mutex = "mutex"; 
        private Session session;
        private MessageConsumer responseConsumer;
    
        public OpenMqClient() {
            if(connection == null) {
                synchronized(mutex) {
                    if(connection == null) {
                        String host = System.getProperty("foo", jmsBrokerUri);
                        QueueConnectionFactory cf = new QueueConnectionFactory();
                        cf.setProperty(ConnectionConfiguration.imqAddressList, host);
    
                        // Setup connection
                        connection = cf.createConnection();
                        connection.start();
                    }
                }
            }
        }
    
        public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
            try {
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                // Setup queue and producer
                Queue queue = session.createQueue(queueName);
                MessageProducer producer = session.createProducer(queue);
    
                // Reply destination
                Queue responseQueue = session.createTemporaryQueue();
                responseConsumer = session.createConsumer(responseQueue);
                responseConsumer.setMessageListener(this);
    
                // Create message
                TextMessage textMessage = session.createTextMessage();
                textMessage.setJMSReplyTo(responseQueue);
                textMessage.setJMSCorrelationID("test0101");
                textMessage.setText(messageContent);
    
                producer.send(textMessage);
                System.out.println("Message sent");
            } catch (JMSException e) {
                e.printStackTrace();
                System.out.println("JMSException in Sender");
            }
        }
    
        @Override
        public void onMessage(Message arg0) {
            // do stuff
            new Thread(
                new Runnable() {
                    @Override
                    public void run() {
                        if(session != null)
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }               
                    }
                }
            ).start();
        }
    }