将 Tomcat 与 ActiveMQ 和 Spring 的 DefaultMessageListenerContainer 集成 - 再次重新传递 JMS 消息

Integrated Tomcat with ActiveMQ and Spring's DefaultMessageListenerContainer - redeliver JMS msg again

我有以下设置: Tomcat 带有嵌入式 ActiveMQ

我使用 Spring 集成的 JmsMessageDrivenChannelAdapter 创建适配器,它使用来自 ActiveMQ 队列的消息,如下所示:

 Jms.messageDrivenChannelAdapter(jmsConnectionFactory, TransactedMessageListenerContainer.class)
      .destination(destination)
      .errorChannel(errorChannel)
     .get();

TransactedMessageListenerContainer 在哪里

public class TransactedMessageListenerContainer extends DefaultMessageListenerContainer {

     public TransactedMessageListenerContainer() {
         this.setSessionTransacted(true);
     } 
}

如果发生异常,ActiveMQ 代理不会相应地重新传递消息。

当我使用org.apache.activemq.broker.BrokerService进行简单的集成测试时,JMS消息被重新传递,即我可以实现重试机制

如何使用 ActiveMQ 为 Tomcat 实现同样的效果?

我在这里发现:http://activemq.apache.org/tomcat.html,手动将 ActiveMQ 与 Tomcat 集成确实允许主题、队列和 ConnectionFactory 注入,但不支持事务发送和传递,但我不确定是否有一些解决方法

感谢帮助!

更新: 我还在错误处理程序中重新抛出异常,如下所示:

@Bean
public IntegrationFlow errorHandlingFlow() {
    return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
             .handle(this::errorMessageHandler)
             .get();
}

public void errorMessageHandler(Message<?> message) {
    log.warn("handling error message");
    log.warn("headers: " + message.getHeaders().toString());
    log.warn("payload: " + message.getPayload().toString());
    MessagingException exception = (MessagingException) message.getPayload();
    log.warn("original payload: " + exception.getFailedMessage().getPayload());
    throw exception; // make JMS broker redeliver
}

如果你想要回滚和重新传递,你的错误流必须重新抛出异常,而不是像默认情况下那样吞没 errorChannel。你可以在这里找到类似的问题和答案。

更新

嗯,不确定你的问题出在哪里我有一个 test-case 与你的相似:

    @Autowired
    private MessageChannel errorChannel;

    @Bean
    public IntegrationFlow jmsMessageDrivenFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.jmsConnectionFactory)
                        .configureListenerContainer(c -> c.sessionTransacted(true))
                        .errorChannel(this.errorChannel)
                        .destination("jmsMessageDriver"))
                .<String, String>transform(p -> {
                    throw new RuntimeException("intentional");
                })
                .get();
    }

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                .handle(m -> {
                    MessagingException exception = (MessagingException) m.getPayload();
                    Message<?> failedMessage = exception.getFailedMessage();
                    throw exception;
                })
                .get();
    }

在重新投递时,我在 failedMessage 中看到这些 headers:

"jms_redelivered" -> "true"
"JMSXDeliveryCount" -> "2"

是的,我们在测试中确实使用了嵌入式 ActiveMQ。 顺便说一下,为 Spring 引导 ActiveMQAutoConfiguration 提供了 JMS ConnectionFactory