消息使用者在第一条消息后被阻止

Message consumer is got blocked after first message

我正在尝试使用来自多个客户端的消息并响应我收到了消息,但是当第一个客户端发送消息时,他没有收到任何响应,而其他客户端的 none可以发送任何消息。好像我的消费者 class 被锁定了。有什么想法吗?

public class Consumer implements MessageListener {
    private static ConnectionFactory factory = null;
    private static Connection connection = null;
    private static Session session = null;
    private static Destination sendQueue = null;
    private static Destination recieveQueue = null;
    private static MessageConsumer consumer = null;
    private static MessageProducer producer = null;

    final static Logger logger = Logger.getLogger(Consumer.class);

    public static void main(String[] args) {

        try {   
            factory = new ActiveMQConnectionFactory( "tcp://localhost:61616");
             connection = factory.createConnection();
            connection.start();
             session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            recieveQueue = session.createQueue("BookingQueue");
             consumer = session.createConsumer(recieveQueue);
        //  JMSMessageListener listener = new JMSMessageListener();
        //  consumer.setMessageListener(listener);

        } catch (Exception e) {
            System.out.println(e);
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
        if (message instanceof ObjectMessage) {
            ObjectMessage msg = (ObjectMessage)message;
            Booking booking;
            try {
                booking = (Booking) msg.getObject();
                logger.info("Received order for " + booking.getCustomer());
                sendQueue = message.getJMSReplyTo();
                producer = session.createProducer(sendQueue);
                logger.info("# The Agent sending hello");
                TextMessage messageNew = session.createTextMessage("Hello, please reply immediately to my message!");   
                messageNew.setJMSReplyTo(recieveQueue);

                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                producer.send(messageNew);
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

    }

    }


}

您没有用 Session 注册 MessageListener。您可以通过创建一个无限循环来解决此问题,该循环使用 MessageConsumer 轮询 Queue,使用新的 MessageProducer 发送确认。类似于以下内容:

public class Consumer implements Runnable {
    private final Connection connection;
    private final MessageConsumer consumer;
    private final Session producerSession;
    private volatile boolean closed = false;

    public Consumer(Connection connection) {
        this.connection = connection;
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination receiveQueue = session.createQueue("BookingQueue");
        consumer = session.createConsumer(receiveQueue);
        producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public void close() {
        closed = true;
        connection.close();
    }

    @Override
    public void run() {
        while(!closed) {
            Message message = consumer.receive();
            Destination sendQueue = message.getJMSReplyTo();
            MessageProducer producer = producerSession.createProducer(sendQueue);
            // send message via producer
            producer.close();
        }
    }
}

MessageListener 已替换为对 consumer.receive()

的调用