Java JMS - 消息侦听器和 onException
Java JMS - Message Listener and onException
我有一个应用程序,它有一个主线程和一个 JMS 线程,它们通过 ActiveMQ 5.15.11 相互通信。我可以很好地发送消息,但是我想要一种发回状态或错误的方法。我注意到 MessageListener
允许 onSuccess()
和 onException(ex)
作为两个事件来监听,但是我发现只有 onSuccess()
被调用。
这是我的代码片段。
JMS 线程:
ConnectionFactory factory = super.getConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(super.getQueue());
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(m -> {
try {
super.processRmbnConfigMsg(m);
} catch (JMSException | IOException e) {
LOG.error(e.getMessage(), e);
// I can only use RuntimeException.
// Also this exception is what I am expecting to get passed to the onException(..)
// call in the main thread.
throw new RuntimeException(e);
}
});
connection.start();
主线程(向JMS发送消息):
sendMessage(xml, new AsyncCallback() {
@Override
public void onException(JMSException e) {
// I am expecting this to be that RuntimeException from the JMS thread.
LOG.error("Error", e);
doSomethingWithException(e);
}
@Override
public void onSuccess() {
LOG.info("Success");
}
});
我期望的是 new RuntimeException(e)
中抛出的异常将以某种方式在 onException(JMSException e)
事件侦听器中被拾取,即使 RuntimeException
被包装。
相反,我总是收到 onSuccess()
事件。我想 onException(..)
事件是在通信问题期间发生的,但我想要一种发送回调用者异常的方法。
如何实现在 JMS 线程中收集错误并将其发送回调用线程的目标?
您的期望是基于对 JMS 的根本误解。
代理消息传递的基本原则之一是生产者和消费者在逻辑上彼此断开连接。换句话说......生产者向代理发送消息,它不一定关心它是否被消费成功,它当然不会知道谁消费它或有任何保证何时会被消费。同样,消费者不一定知道消息发送的时间、原因或发送者。这为生产者和消费者之间提供了极大的灵活性。 JMS 遵循断开连接的生产者和消费者这一原则。
消费者没有直接方式通知生产者其发送的消息的消费问题。也就是说,您 可以 使用所谓的 "request/response pattern" 以便消费者可以向生产者提供某种反馈。您可以找到此模式的解释以及示例代码 here.
此外,您使用的 AsyncCallback
class 不是 JMS 的一部分。我相信它是由 ActiveMQ 本身专门提供的 org.apache.activemq.AsyncCallback
,它只为实际的 send 操作提供成功或失败的回调(即不用于消息的消费)。
最后,您应该知道从 javax.jms.MessageListener
的 onMessage
方法中抛出 RuntimeException
被 JMS 规范视为 "programming error",应该避免。 JMS 2 规范的第 8.7 节指出:
It is possible for a listener to throw a RuntimeException
; however, this is considered a client programming error. Well behaved listeners should catch such exceptions and attempt to divert messages causing them to some form of application-specific 'unprocessable message' destination.
The result of a listener throwing a RuntimeException
depends on the session's acknowledgment mode.
AUTO_ACKNOWLEDGE
or DUPS_OK_ACKNOWLEDGE
- the message will be immediately redelivered. The number of times a JMS provider will redeliver the same message before giving up is provider-dependent. The JMSRedelivered message header field will be set, and the JMSXDeliveryCount message property incremented, for a message redelivered under these circumstances.
CLIENT_ACKNOWLEDGE
- the next message for the listener is delivered. If a client wishes to have the previous unacknowledged message redelivered, it must manually recover the session.
Transacted Session - the next message for the listener is delivered. The client can either commit or roll back the session (in other words, a RuntimeException
does not automatically rollback the session).
我有一个应用程序,它有一个主线程和一个 JMS 线程,它们通过 ActiveMQ 5.15.11 相互通信。我可以很好地发送消息,但是我想要一种发回状态或错误的方法。我注意到 MessageListener
允许 onSuccess()
和 onException(ex)
作为两个事件来监听,但是我发现只有 onSuccess()
被调用。
这是我的代码片段。
JMS 线程:
ConnectionFactory factory = super.getConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(super.getQueue());
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(m -> {
try {
super.processRmbnConfigMsg(m);
} catch (JMSException | IOException e) {
LOG.error(e.getMessage(), e);
// I can only use RuntimeException.
// Also this exception is what I am expecting to get passed to the onException(..)
// call in the main thread.
throw new RuntimeException(e);
}
});
connection.start();
主线程(向JMS发送消息):
sendMessage(xml, new AsyncCallback() {
@Override
public void onException(JMSException e) {
// I am expecting this to be that RuntimeException from the JMS thread.
LOG.error("Error", e);
doSomethingWithException(e);
}
@Override
public void onSuccess() {
LOG.info("Success");
}
});
我期望的是 new RuntimeException(e)
中抛出的异常将以某种方式在 onException(JMSException e)
事件侦听器中被拾取,即使 RuntimeException
被包装。
相反,我总是收到 onSuccess()
事件。我想 onException(..)
事件是在通信问题期间发生的,但我想要一种发送回调用者异常的方法。
如何实现在 JMS 线程中收集错误并将其发送回调用线程的目标?
您的期望是基于对 JMS 的根本误解。
代理消息传递的基本原则之一是生产者和消费者在逻辑上彼此断开连接。换句话说......生产者向代理发送消息,它不一定关心它是否被消费成功,它当然不会知道谁消费它或有任何保证何时会被消费。同样,消费者不一定知道消息发送的时间、原因或发送者。这为生产者和消费者之间提供了极大的灵活性。 JMS 遵循断开连接的生产者和消费者这一原则。
消费者没有直接方式通知生产者其发送的消息的消费问题。也就是说,您 可以 使用所谓的 "request/response pattern" 以便消费者可以向生产者提供某种反馈。您可以找到此模式的解释以及示例代码 here.
此外,您使用的 AsyncCallback
class 不是 JMS 的一部分。我相信它是由 ActiveMQ 本身专门提供的 org.apache.activemq.AsyncCallback
,它只为实际的 send 操作提供成功或失败的回调(即不用于消息的消费)。
最后,您应该知道从 javax.jms.MessageListener
的 onMessage
方法中抛出 RuntimeException
被 JMS 规范视为 "programming error",应该避免。 JMS 2 规范的第 8.7 节指出:
It is possible for a listener to throw a
RuntimeException
; however, this is considered a client programming error. Well behaved listeners should catch such exceptions and attempt to divert messages causing them to some form of application-specific 'unprocessable message' destination. The result of a listener throwing aRuntimeException
depends on the session's acknowledgment mode.
AUTO_ACKNOWLEDGE
orDUPS_OK_ACKNOWLEDGE
- the message will be immediately redelivered. The number of times a JMS provider will redeliver the same message before giving up is provider-dependent. The JMSRedelivered message header field will be set, and the JMSXDeliveryCount message property incremented, for a message redelivered under these circumstances.
CLIENT_ACKNOWLEDGE
- the next message for the listener is delivered. If a client wishes to have the previous unacknowledged message redelivered, it must manually recover the session.Transacted Session - the next message for the listener is delivered. The client can either commit or roll back the session (in other words, a
RuntimeException
does not automatically rollback the session).