连续接收来自 Activemq 的消息

Recieve Message from Active MQ continously

下面是我的代码,用于从 java

中的活动 mq 接收消息
public void main(){
                   try{ // Create a ConnectionFactory
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("myurl");

  // Create a Connection
  connection = connectionFactory.createConnection();
  connection.start();
  // System.out.println(connection.toString());

  //connection.setExceptionListener(this);

  // Create a Session
  session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

  // Create the destination (Topic or Queue)
  Destination destination = session.createQueue("myqueuename");
  consumer = session.createConsumer(destination);
while (true)//never ending loop
  {
    message = consumer.receive(); //System would wait here to read the message from the queue
    if (message instanceof TextMessage)
    {
      TextMessage textMessage = (TextMessage) message;
      String text = textMessage.getText();

     System.out.println(text);

      message.acknowledge();
      session.commit(); 
    }

    }
}
finaly{
closeallconnection();
}

由于while条件为真,循环会一直运行和message.receive等待,直到有新消息。所以,这里的问题是我从 apache tomcat 中 运行ning 并且成功 运行ning,但是当我关闭 tomcat 时,这段代码是 运行即使我关闭 tomcat 也会落后。 如果我再次启动 tomcat,即使存在现有代码,也会有另一个服务 运行。 因此,我在这里需要的帮助是如何连续接收消息并等待新消息,以及当我关闭 tomcat 时如何关闭此 jms 侦听器。因为,我不知道队列中会有多少消息,也不知道我什么时候会收到消息。任何帮助和建议都会很棒。

我认为这不是最好的方法。它无法扩展。

你不想要一个listener/processor;你想要一堆。当消息到达队列时,侦听器将从池中签出,处理该消息,并在完成后返回池中。

这会扩展,因为多个侦听器可以处理繁重的负载。每个请求一条消息。

您展示的并不是真正的生产级代码。虽然执行接收循环不是一个坏主意,但您需要在幕后提供一些样板逻辑,例如将该代码放入可配置数量的线程中,处理异常和重新连接,并集成到您的应用程序生命周期中。

您通常需要一些 Web 应用程序框架,例如 spring 来帮助完成这些任务。

在 Spring 的最新版本中,您甚至可以使用注释来做一些事情,这使得事情变得非常简单。使用 spring 引导,您不需要繁重的配置,而是 属性 文件中的简单连接字符串。

@Component
public class MessageListener {

    @JmsListener(destination = "my.queue")
    public void receiveMessage(String msg) {
        System.out.println("Received :" + msg);
    }
}

我和你@GRK 有同样的问题,我不想使用无限循环来让消费者倾听。 This 是我发现似乎对我有用的最简单的解决方案。

            MessageConsumer consumer = session.createConsumer(queue);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    try {
                        if (!(msg instanceof TextMessage))
                            throw new RuntimeException("no text message");
                        TextMessage tm = (TextMessage) msg;
                        System.out.println(tm.getText()); // print message
                    } catch (JMSException e) {
                        System.err.println("Error reading message");
                    }
                }
            });

通过使用 MessageListener 或扩展 JMS MessageListener 的 class,您可以处理任何传入的消息,并且仍然部署代码,而不是挂在无限循环中。