停止 MDB 消息重新传递
Stop MDB Message Redelivery
我有一个 StatelessSessionBean,它在调用特定方法时会创建一个 MapMessage
以发送到 MDB。
QueueConnection connection = null;
QueueSession mSession = null;
QueueSender messageProducer = null;
try {
QueueConnectionFactory connectionFactory = (QueueConnectionFactory) home(session).getCTX().lookup(DocumentManagementTransactionUtil.QUEUE_CONNECTION_FACTORY);
connection = connectionFactory.createQueueConnection();
mSession = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue dest = (Queue) home.getCTX().lookup(DocumentManagementTransactionUtil.QUEUE_DESTINATION);
messageProducer = mSession.createSender(dest);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
messageProducer.send(createJMSMessageFordocMessageReceiver(mSession, session, idnr, debugCode));
} catch (Exception ex) {
log.error("failed to start DocumentTransfer MDB", ex);
throw new AsaleException("failed to start DocumentTransfer MDB", ex);
} finally {
try { if (messageProducer != null) messageProducer.close(); } catch (Exception e) { }
try { if (mSession != null) mSession.close(); } catch (Exception e) { }
try { if (connection != null) connection.stop(); connection.close(); } catch (Exception e) { }
}
现在,在我的 MDB 的 onMessage
方法中,我从消息中读出我需要的所有内容并确认它。
MapMessage msg = (MapMessage) message;
project = msg.getLong("project");
Long lang = msg.getLong("lang");
int firm = msg.getInt("opFirm");
int sub = msg.getInt("opSub");
long user = msg.getLong("user");
int debugCode = msg.getInt("debugCode");
log.debug(project + prelog + "DocumentManagementMDBean... Retrieved Message: User: " + user + " Project: " + project + " JMS MessageID: " + message.getJMSMessageID() + " Redelivered: " + message.getJMSRedelivered());
message.acknowledge();
并且一个冗长的操作开始,它需要一个未定义的时间(1 最多 X 分钟)而不从 onMessage
方法返回。
5 分钟后,我的消息(从 ID 和 redelivered
状态可以看出)被重新发送,即使我之前已确认。
我是不是做错了什么,或者有没有办法告诉系统不要重新发送消息?
编辑:
@TransactionManagement(TransactionManagementType.BEAN)
@MessageDriven(mappedName = DocumentManagementTransactionUtil.MAPPED_NAME, activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = DocumentManagementTransactionUtil.QUEUE_DESTINATION),
@ActivationConfigProperty(propertyName = "dLQMaxResent", propertyValue = "0")})
关于消息驱动 bean 的一切都是事务性的。
如果无法将消息传送到 queue/topic,那么您的事务将失败并被回滚。
如果由于任何原因无法处理消息,则将回滚执行 MDB 的事务。在这种情况下,规范要求容器重试将消息传递到 MDB。
在您的情况下,您的事务似乎超时,之后任何事务性操作(例如数据库访问或 EJB 调用)都将失败并抛出异常。容器随后重试处理消息。
一般来说,您不应尝试在任何 EJB 方法(包括消息驱动的 bean)中执行长 运行 进程。
因为您用 wildfly
标记了它,所以我猜您使用的是 Java EE 7 实现。如果是这种情况,您可以考虑使用 JSR-352 批处理 API 来达到您的目的。
我有一个 StatelessSessionBean,它在调用特定方法时会创建一个 MapMessage
以发送到 MDB。
QueueConnection connection = null;
QueueSession mSession = null;
QueueSender messageProducer = null;
try {
QueueConnectionFactory connectionFactory = (QueueConnectionFactory) home(session).getCTX().lookup(DocumentManagementTransactionUtil.QUEUE_CONNECTION_FACTORY);
connection = connectionFactory.createQueueConnection();
mSession = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue dest = (Queue) home.getCTX().lookup(DocumentManagementTransactionUtil.QUEUE_DESTINATION);
messageProducer = mSession.createSender(dest);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
messageProducer.send(createJMSMessageFordocMessageReceiver(mSession, session, idnr, debugCode));
} catch (Exception ex) {
log.error("failed to start DocumentTransfer MDB", ex);
throw new AsaleException("failed to start DocumentTransfer MDB", ex);
} finally {
try { if (messageProducer != null) messageProducer.close(); } catch (Exception e) { }
try { if (mSession != null) mSession.close(); } catch (Exception e) { }
try { if (connection != null) connection.stop(); connection.close(); } catch (Exception e) { }
}
现在,在我的 MDB 的 onMessage
方法中,我从消息中读出我需要的所有内容并确认它。
MapMessage msg = (MapMessage) message;
project = msg.getLong("project");
Long lang = msg.getLong("lang");
int firm = msg.getInt("opFirm");
int sub = msg.getInt("opSub");
long user = msg.getLong("user");
int debugCode = msg.getInt("debugCode");
log.debug(project + prelog + "DocumentManagementMDBean... Retrieved Message: User: " + user + " Project: " + project + " JMS MessageID: " + message.getJMSMessageID() + " Redelivered: " + message.getJMSRedelivered());
message.acknowledge();
并且一个冗长的操作开始,它需要一个未定义的时间(1 最多 X 分钟)而不从 onMessage
方法返回。
5 分钟后,我的消息(从 ID 和 redelivered
状态可以看出)被重新发送,即使我之前已确认。
我是不是做错了什么,或者有没有办法告诉系统不要重新发送消息?
编辑:
@TransactionManagement(TransactionManagementType.BEAN)
@MessageDriven(mappedName = DocumentManagementTransactionUtil.MAPPED_NAME, activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = DocumentManagementTransactionUtil.QUEUE_DESTINATION),
@ActivationConfigProperty(propertyName = "dLQMaxResent", propertyValue = "0")})
关于消息驱动 bean 的一切都是事务性的。
如果无法将消息传送到 queue/topic,那么您的事务将失败并被回滚。
如果由于任何原因无法处理消息,则将回滚执行 MDB 的事务。在这种情况下,规范要求容器重试将消息传递到 MDB。
在您的情况下,您的事务似乎超时,之后任何事务性操作(例如数据库访问或 EJB 调用)都将失败并抛出异常。容器随后重试处理消息。
一般来说,您不应尝试在任何 EJB 方法(包括消息驱动的 bean)中执行长 运行 进程。
因为您用 wildfly
标记了它,所以我猜您使用的是 Java EE 7 实现。如果是这种情况,您可以考虑使用 JSR-352 批处理 API 来达到您的目的。