如何在 Active MQ 中启动线程会话 5.x

How do I start a threaded session in Active MQ 5.x

我想这是一道标准的知识问题,但是 current_session 运行 是否有线程?

ActiveMQSession current_session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Thread sessionThreads = new Thread(current_session);
sessionThreads.start();

如果没有,能否请谁告诉我一个代码示例,我如何 运行 会话线程化?

我想要的是 producers/consumers 的并发会话 write/listen 到他们的特定队列。我已经尝试通过将连接传递给线程来编写自定义线程,但是当我创建生产者时,我 运行 遇到错误,“我无法在未注册的会话上启动生产者”。

java.lang.Thread 没有接受 org.apache.activemq.ActiveMQSession 对象的构造函数。您的代码甚至 都无法编译 更不用说 运行.

这是一个简单的客户端,它将创建用于生成和使用消息的线程:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

public class MyMultiThreadedApp {

   class MyConsumer extends Thread {
      private final Connection connection;
      private final Destination destination;

      MyConsumer(Connection connection, Destination destination) {
         this.connection = connection;
         this.destination = destination;
      }

      @Override
      public void run() {
         try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
            MessageConsumer messageConsumer = session.createConsumer(destination);
            connection.start();
            Message message = messageConsumer.receive(5000);
            if (message == null) {
               System.out.println("Did not receive message within the allotted time.");
               return;
            }
            System.out.println("Received message: " + message);
         } catch (Throwable e) {
            e.printStackTrace();
            return;
         }
      }
   }

   class MyProducer extends Thread {
      private final Connection connection;
      private final Destination destination;

      MyProducer(Connection connection, Destination destination) {
         this.connection = connection;
         this.destination = destination;
      }

      @Override
      public void run() {
         try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
            MessageProducer messageProducer = session.createProducer(destination);
            messageProducer.send(session.createTextMessage("My message"));
            System.out.println("Sent message");
         } catch (Throwable e) {
            e.printStackTrace();
            return;
         }
      }
   }

   public static void main(String... args) throws Exception {
      MyMultiThreadedApp myMultiThreadedApp = new MyMultiThreadedApp();
      InitialContext initialContext = null;   
      initialContext = new InitialContext();
      Queue queue = (Queue) initialContext.lookup("queue/exampleQueue");
      ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
      Connection connection = cf.createConnection();
      Thread myConsumer = myMultiThreadedApp.runConsumer(connection, queue);
      Thread myProducer = myMultiThreadedApp.runProducer(connection, queue);
      myConsumer.join();
      myProducer.join();
   }

   private Thread runConsumer(Connection connection, Destination destination) {
      MyConsumer myConsumer = new MyConsumer(connection, destination);
      myConsumer.start();
      return myConsumer;
   }

   private Thread runProducer(Connection connection, Destination destination) {
      MyProducer myProducer = new MyProducer(connection, destination);
      myProducer.start();
      return myProducer;
   }
}