尝试为 JMS 队列设置侦听器时,此方法不适用于应用程序服务器内部

This method is not applicable inside the app server when trying to set a listener for a JMS queue

我正在试用 JMS 2.0,因此我可以决定它是否值得在我的项目中应用。我可以成功创建一个 send/receive 应用程序。

现在我想让监听器在队列中有可用消息时立即接收消息(我的最终目标是让不同的监听器到同一个队列,每个监听器都有不同的消息选择器。

目前我有这个 class:

package learning.jms;


import java.io.Serializable;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.context.SessionScoped;
import javax.faces.application.FacesMessage;
import javax.faces.context.FacesContext;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.JMSConnectionFactory;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;

@Named(value="senderBean")
@SessionScoped
public class SenderBean implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Resource(mappedName="queues/myQueue")
    private transient Queue myQueue;

    @Inject
    @JMSConnectionFactory("java:/DefaultJMSConnectionFactory")
    private transient JMSContext context;

    private String messageText;

    private int nextType = 3;
    private transient JMSConsumer consumer;
    private transient JMSConsumer consumer2;
    private transient JMSConsumer consumer3;

    public SenderBean() {
    }

    @PostConstruct
    public void setUp(){

    }

    public String getMessageText() {
        return messageText;
    }

    public void setMessageText(String messageText) {
        this.messageText = messageText;
    }

    public void sendJMSMessageToMyQueue() {
        try {

            consumer = context.createConsumer(myQueue, "type=1");
            consumer.setMessageListener(new ListenerTypeOne());

//          consumer2 = context.createConsumer(myQueue, "type=2");
//          consumer2.setMessageListener(new ListenerTypeTwo());
//          
//          consumer3 = context.createConsumer(myQueue, "type=3");
//          consumer3.setMessageListener(new ListenerTypeThree());

           String text = "Message from producer: " + messageText;
           Message m1 = context.createTextMessage(text);
           m1.setIntProperty("type", nextType);

           System.out.println("producer sending msg type " + nextType + "value: " + text);

           nextType = (nextType++%3)+1;


           context.createProducer().send(myQueue, m1);

           FacesMessage facesMessage =
                   new FacesMessage("Sent message: " + text);
           FacesContext.getCurrentInstance().addMessage(null, facesMessage);
       } catch (JMSRuntimeException | JMSException t) {
            System.out.println(t.toString());
       }
   }

    private class ListenerTypeOne implements MessageListener{

        @Override
        public void onMessage(Message msg) {
            try {
                System.out.println("Msg received by typeOne:" + msg.getBody(String.class));
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    private class ListenerTypeTwo implements MessageListener{

        @Override
        public void onMessage(Message msg) {
            try {
                System.out.println("Msg received by typeTwo:" + msg.getBody(String.class));
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    private class ListenerTypeThree implements MessageListener{

        @Override
        public void onMessage(Message msg) {
            try {
                System.out.println("Msg received by typeThree:" + msg.getBody(String.class));
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }
}

我注释掉了两个消费者,所以我可以专注于做一个工作。 我不断在 setMessageListener 行收到以下异常:

 javax.jms.IllegalStateException: This method is not applicable inside the application server. See the J2EE spec, e.g. J2EE1.4 Section 6.6
    at org.hornetq.ra.HornetQRASession.checkStrict(HornetQRASession.java:1647)
    at org.hornetq.ra.HornetQRAMessageConsumer.setMessageListener(HornetQRAMessageConsumer.java:124)
    at org.hornetq.jms.client.HornetQJMSConsumer.setMessageListener(HornetQJMSConsumer.java:68)

我不知道是什么原因造成的,我的搜索没有提供任何额外信息。 我想这可能与一个组件不应超过一个活动会话这一事实有关。在这种情况下,如何创建多个监听器来监听队列?

(如果重要:我使用的是 Wildfly 8)

编辑 我已将侦听器创建提取到一个单独的 bean 中,但仍然出现相同的错误:

package learning.jms;


import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.JMSConnectionFactory;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;


@ApplicationScoped
public class ListenerOne {
    @Inject
    @JMSConnectionFactory("java:/DefaultJMSConnectionFactory")
    private JMSContext context;

    @Resource(mappedName="queues/myQueue")
    private Queue myQueue;


    private JMSConsumer consumer;

    public void setUp() {
        consumer = context.createConsumer(myQueue, "type=1");
        consumer.setMessageListener(new ListenerTypeOne());


        System.out.println("working");
    }

    private class ListenerTypeOne implements MessageListener{

        @Override
        public void onMessage(Message msg) {
            try {
                System.out.println("Msg received by typeOne:" + msg.getBody(String.class));
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

所以,寻找 MDB 解决了这个问题。 我从我试图创建的消费者的任何痕迹中清除了 senderBean 类:

package learning.jms;


import java.io.Serializable;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.context.SessionScoped;
import javax.faces.application.FacesMessage;
import javax.faces.context.FacesContext;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.JMSConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.jms.Queue;

@Named(value="senderBean")
@SessionScoped
public class SenderBean implements Serializable{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Resource(mappedName="queues/myQueue")
    private transient Queue myQueue;

    @Inject
    @JMSConnectionFactory("java:/DefaultJMSConnectionFactory")
    private transient JMSContext context;

    private String messageText;

    private int nextType;

    public SenderBean() {
        // TODO Auto-generated constructor stub
    }

    @PostConstruct
    public void init(){
        nextType=2;
    }

    public String getMessageText() {
        return messageText;
    }

    public void setMessageText(String messageText) {
        this.messageText = messageText;
    }

    public void sendJMSMessageToMyQueue() {
        try {
           String text = "Message from producer: " + messageText;
           Message m1 = context.createTextMessage(text);
           m1.setIntProperty("type", nextType);

           nextType = (nextType++%3)+1;

           context.createProducer().send(myQueue, m1);

           FacesMessage facesMessage =
                   new FacesMessage("Sent message: " + text);
           FacesContext.getCurrentInstance().addMessage(null, facesMessage);
       } catch (JMSRuntimeException | JMSException t) {
            System.out.println(t.toString());
       }
   }
}

(注意它只是会话范围,所以我可以遍历消息类型)

并创建了 3 个 MDB

package learning.jms;


import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;



@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup",
            propertyValue = "queues/myQueue"),
        @ActivationConfigProperty(propertyName = "destinationType",
            propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "messageSelector",propertyValue = "type=1")
    })
public class ListenerOne implements MessageListener {

    @Override
    public void onMessage(Message msg) {
        try {
            System.out.println("Msg received by typeOne: " + msg.getBody(String.class) + " type: " + msg.getIntProperty("type"));
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

两个:

package learning.jms;


import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;



@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup",
            propertyValue = "queues/myQueue"),
        @ActivationConfigProperty(propertyName = "destinationType",
            propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "messageSelector",propertyValue = "type=2")
    })
public class ListenerTwo implements MessageListener {

    @Override
    public void onMessage(Message msg) {
        try {
            System.out.println("Msg received by typeTwo: " + msg.getBody(String.class) + " type: " + msg.getIntProperty("type"));
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

三个:

package learning.jms;


import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;



@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup",
            propertyValue = "queues/myQueue"),
        @ActivationConfigProperty(propertyName = "destinationType",
            propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "messageSelector",propertyValue = "type=3")
    })
public class ListenerThree implements MessageListener {

    @Override
    public void onMessage(Message msg) {
        try {
            System.out.println("Msg received by typeThree: " + msg.getBody(String.class) + " type: " + msg.getIntProperty("type"));
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

现在他们会自动侦听队列中与他们的选择器匹配的消息。

感谢@prabugp 的帮助:)

以上错误可能是因为您试图在同一容器内使用客户端并从基于 JCA 的连接工厂获取连接工厂。

情况 1:如果客户端远离 jms 服务器,则建议使用 jms/RemoteConnectionFactory,并且不会重现上述问题。

情况 2:如果客户端驻留在同一个容器中,则首选来自基于 JCA 的连接工厂 java/JmsXA 的连接。由于 6:7 部分下的 JEE 7 规范存在限制,即 JEE 服务器不允许 EJB/Web 应用程序具有多个活动会话,即您不能拥有遗留 JMS 应用程序。

例如:在一个方法中:

public void startConnection() {
        try {
            TopicConnectionFactory connectionFactory = (TopicConnectionFactory)    getConnectionFactory();
            topicConnection = connectionFactory.createTopicConnection();
            topicSession = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);           
            subscriber = topicSession.createSubscriber(messageTopic, selector, true);
            MessageListener messageListener = new MessageListener(this);
            // Code to set message listener.
            subscriber.setMessageListener(messageListener);
            topicConnection.start();
        } catch (Exception e) {
            LOG.error(e, e);
            closeConnection();
            throw new RuntimeException(e);
        }
    }

如果上述代码中的连接工厂来自 @Resource(mappedName = "java:jboss/exported/jms/RemoteConnectionFactory")

那么上面的代码就可以工作了。但是如果我们将连接工厂更改为 //@Resource(mappedName = "java:/JmsXA")

然后会抛出上面的错误。

因此,如果您的客户端位于同一个容器中,则应使用 MDB。由于容器需要控制连接对象以支持两阶段提交协议。

我对此的看法是,JSF bean 中不能有消息侦听器,因为 bean 生命周期由 Web 容器控制。

MDB 是唯一由消息驱动的组件,但 JSF MB,如 EJB 或 servlet,无法侦听消息,它们 "live" 在请求的上下文中创建、激活实例,被容器钝化或破坏。

但是,您可以在请求的上下文中使用 receive(),并在客户端设置一些自动刷新,以实现服务器端驱动的流程。