将 Tomcat 与 ActiveMQ 和 Spring 的 DefaultMessageListenerContainer 集成 - 再次重新传递 JMS 消息
Integrated Tomcat with ActiveMQ and Spring's DefaultMessageListenerContainer - redeliver JMS msg again
我有以下设置:
Tomcat 带有嵌入式 ActiveMQ
我使用 Spring 集成的 JmsMessageDrivenChannelAdapter
创建适配器,它使用来自 ActiveMQ 队列的消息,如下所示:
Jms.messageDrivenChannelAdapter(jmsConnectionFactory, TransactedMessageListenerContainer.class)
.destination(destination)
.errorChannel(errorChannel)
.get();
TransactedMessageListenerContainer 在哪里
public class TransactedMessageListenerContainer extends DefaultMessageListenerContainer {
public TransactedMessageListenerContainer() {
this.setSessionTransacted(true);
}
}
如果发生异常,ActiveMQ 代理不会相应地重新传递消息。
当我使用org.apache.activemq.broker.BrokerService
进行简单的集成测试时,JMS消息被重新传递,即我可以实现重试机制
如何使用 ActiveMQ 为 Tomcat 实现同样的效果?
我在这里发现:http://activemq.apache.org/tomcat.html,手动将 ActiveMQ 与 Tomcat 集成确实允许主题、队列和 ConnectionFactory 注入,但不支持事务发送和传递,但我不确定是否有一些解决方法
感谢帮助!
更新:
我还在错误处理程序中重新抛出异常,如下所示:
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(this::errorMessageHandler)
.get();
}
public void errorMessageHandler(Message<?> message) {
log.warn("handling error message");
log.warn("headers: " + message.getHeaders().toString());
log.warn("payload: " + message.getPayload().toString());
MessagingException exception = (MessagingException) message.getPayload();
log.warn("original payload: " + exception.getFailedMessage().getPayload());
throw exception; // make JMS broker redeliver
}
如果你想要回滚和重新传递,你的错误流必须重新抛出异常,而不是像默认情况下那样吞没 errorChannel
。你可以在这里找到类似的问题和答案。
更新
嗯,不确定你的问题出在哪里我有一个 test-case 与你的相似:
@Autowired
private MessageChannel errorChannel;
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.jmsConnectionFactory)
.configureListenerContainer(c -> c.sessionTransacted(true))
.errorChannel(this.errorChannel)
.destination("jmsMessageDriver"))
.<String, String>transform(p -> {
throw new RuntimeException("intentional");
})
.get();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(m -> {
MessagingException exception = (MessagingException) m.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
throw exception;
})
.get();
}
在重新投递时,我在 failedMessage
中看到这些 headers:
"jms_redelivered" -> "true"
"JMSXDeliveryCount" -> "2"
是的,我们在测试中确实使用了嵌入式 ActiveMQ。
顺便说一下,为 Spring 引导 ActiveMQAutoConfiguration
提供了 JMS ConnectionFactory
。
我有以下设置: Tomcat 带有嵌入式 ActiveMQ
我使用 Spring 集成的 JmsMessageDrivenChannelAdapter
创建适配器,它使用来自 ActiveMQ 队列的消息,如下所示:
Jms.messageDrivenChannelAdapter(jmsConnectionFactory, TransactedMessageListenerContainer.class)
.destination(destination)
.errorChannel(errorChannel)
.get();
TransactedMessageListenerContainer 在哪里
public class TransactedMessageListenerContainer extends DefaultMessageListenerContainer {
public TransactedMessageListenerContainer() {
this.setSessionTransacted(true);
}
}
如果发生异常,ActiveMQ 代理不会相应地重新传递消息。
当我使用org.apache.activemq.broker.BrokerService
进行简单的集成测试时,JMS消息被重新传递,即我可以实现重试机制
如何使用 ActiveMQ 为 Tomcat 实现同样的效果?
我在这里发现:http://activemq.apache.org/tomcat.html,手动将 ActiveMQ 与 Tomcat 集成确实允许主题、队列和 ConnectionFactory 注入,但不支持事务发送和传递,但我不确定是否有一些解决方法
感谢帮助!
更新: 我还在错误处理程序中重新抛出异常,如下所示:
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(this::errorMessageHandler)
.get();
}
public void errorMessageHandler(Message<?> message) {
log.warn("handling error message");
log.warn("headers: " + message.getHeaders().toString());
log.warn("payload: " + message.getPayload().toString());
MessagingException exception = (MessagingException) message.getPayload();
log.warn("original payload: " + exception.getFailedMessage().getPayload());
throw exception; // make JMS broker redeliver
}
如果你想要回滚和重新传递,你的错误流必须重新抛出异常,而不是像默认情况下那样吞没 errorChannel
。你可以在这里找到类似的问题和答案。
更新
嗯,不确定你的问题出在哪里我有一个 test-case 与你的相似:
@Autowired
private MessageChannel errorChannel;
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.jmsConnectionFactory)
.configureListenerContainer(c -> c.sessionTransacted(true))
.errorChannel(this.errorChannel)
.destination("jmsMessageDriver"))
.<String, String>transform(p -> {
throw new RuntimeException("intentional");
})
.get();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(m -> {
MessagingException exception = (MessagingException) m.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
throw exception;
})
.get();
}
在重新投递时,我在 failedMessage
中看到这些 headers:
"jms_redelivered" -> "true"
"JMSXDeliveryCount" -> "2"
是的,我们在测试中确实使用了嵌入式 ActiveMQ。
顺便说一下,为 Spring 引导 ActiveMQAutoConfiguration
提供了 JMS ConnectionFactory
。