消息使用者在第一条消息后被阻止
Message consumer is got blocked after first message
我正在尝试使用来自多个客户端的消息并响应我收到了消息,但是当第一个客户端发送消息时,他没有收到任何响应,而其他客户端的 none可以发送任何消息。好像我的消费者 class 被锁定了。有什么想法吗?
public class Consumer implements MessageListener {
private static ConnectionFactory factory = null;
private static Connection connection = null;
private static Session session = null;
private static Destination sendQueue = null;
private static Destination recieveQueue = null;
private static MessageConsumer consumer = null;
private static MessageProducer producer = null;
final static Logger logger = Logger.getLogger(Consumer.class);
public static void main(String[] args) {
try {
factory = new ActiveMQConnectionFactory( "tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
recieveQueue = session.createQueue("BookingQueue");
consumer = session.createConsumer(recieveQueue);
// JMSMessageListener listener = new JMSMessageListener();
// consumer.setMessageListener(listener);
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
if (message instanceof ObjectMessage) {
ObjectMessage msg = (ObjectMessage)message;
Booking booking;
try {
booking = (Booking) msg.getObject();
logger.info("Received order for " + booking.getCustomer());
sendQueue = message.getJMSReplyTo();
producer = session.createProducer(sendQueue);
logger.info("# The Agent sending hello");
TextMessage messageNew = session.createTextMessage("Hello, please reply immediately to my message!");
messageNew.setJMSReplyTo(recieveQueue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(messageNew);
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
您没有用 Session
注册 MessageListener
。您可以通过创建一个无限循环来解决此问题,该循环使用 MessageConsumer
轮询 Queue
,使用新的 MessageProducer
发送确认。类似于以下内容:
public class Consumer implements Runnable {
private final Connection connection;
private final MessageConsumer consumer;
private final Session producerSession;
private volatile boolean closed = false;
public Consumer(Connection connection) {
this.connection = connection;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination receiveQueue = session.createQueue("BookingQueue");
consumer = session.createConsumer(receiveQueue);
producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() {
closed = true;
connection.close();
}
@Override
public void run() {
while(!closed) {
Message message = consumer.receive();
Destination sendQueue = message.getJMSReplyTo();
MessageProducer producer = producerSession.createProducer(sendQueue);
// send message via producer
producer.close();
}
}
}
MessageListener
已替换为对 consumer.receive()
的调用
我正在尝试使用来自多个客户端的消息并响应我收到了消息,但是当第一个客户端发送消息时,他没有收到任何响应,而其他客户端的 none可以发送任何消息。好像我的消费者 class 被锁定了。有什么想法吗?
public class Consumer implements MessageListener {
private static ConnectionFactory factory = null;
private static Connection connection = null;
private static Session session = null;
private static Destination sendQueue = null;
private static Destination recieveQueue = null;
private static MessageConsumer consumer = null;
private static MessageProducer producer = null;
final static Logger logger = Logger.getLogger(Consumer.class);
public static void main(String[] args) {
try {
factory = new ActiveMQConnectionFactory( "tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
recieveQueue = session.createQueue("BookingQueue");
consumer = session.createConsumer(recieveQueue);
// JMSMessageListener listener = new JMSMessageListener();
// consumer.setMessageListener(listener);
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
if (message instanceof ObjectMessage) {
ObjectMessage msg = (ObjectMessage)message;
Booking booking;
try {
booking = (Booking) msg.getObject();
logger.info("Received order for " + booking.getCustomer());
sendQueue = message.getJMSReplyTo();
producer = session.createProducer(sendQueue);
logger.info("# The Agent sending hello");
TextMessage messageNew = session.createTextMessage("Hello, please reply immediately to my message!");
messageNew.setJMSReplyTo(recieveQueue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(messageNew);
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
您没有用 Session
注册 MessageListener
。您可以通过创建一个无限循环来解决此问题,该循环使用 MessageConsumer
轮询 Queue
,使用新的 MessageProducer
发送确认。类似于以下内容:
public class Consumer implements Runnable {
private final Connection connection;
private final MessageConsumer consumer;
private final Session producerSession;
private volatile boolean closed = false;
public Consumer(Connection connection) {
this.connection = connection;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination receiveQueue = session.createQueue("BookingQueue");
consumer = session.createConsumer(receiveQueue);
producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() {
closed = true;
connection.close();
}
@Override
public void run() {
while(!closed) {
Message message = consumer.receive();
Destination sendQueue = message.getJMSReplyTo();
MessageProducer producer = producerSession.createProducer(sendQueue);
// send message via producer
producer.close();
}
}
}
MessageListener
已替换为对 consumer.receive()