在 DefaultMessageListenerContainer 中配置 sessionAcknowledgeMode

Configuring sessionAcknowledgeMode in DefaultMessageListenerContainer

我有一个设置,我必须从 ActiveMQ 代理中的队列中读取消息。阅读邮件后,我必须对邮件进行长运行操作。

由于对消息的这种长时间 运行 操作,我想尽快确认消息,以便释放代理上的资源。计划是在收到消息后执行以下步骤:

  1. 从 ActiveMQ 获取消息
  2. 将消息插入数据库
  3. 确认消息
  4. 对消息
  5. 做一些长运行操作

我已经了解了 JMS 和不同的确认模式,所以在尝试这样做之前,我决定设置一个应用程序,我可以在其中尝试不同的模式以了解它们是如何处理的,不幸的是我似乎无法得到我想要的输出。

根据此答案中的信息 as well as https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html 我认为通过使用 AUTO_ACKNOWLEDGE 消息会在我的侦听器被调用之前得到确认,但如果我在侦听器中抛出异常,则消息是重新交付。

我尝试过将 setSessionTransacted 设置为 true 和不设置为 true,但在这两种情况下我得到相同的输出。在 JmsListener 中抛出异常时重新传递消息。

JMS的配置

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory(jmsConfig.getBrokerUrl());

        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmstemplate(){
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(connectionFactory());
        //jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setDefaultDestinationName( jmsConfig.getQueueIn() );
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerFactoryxxxx(
                    ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        //factory.setConcurrency("1");
        factory.setSessionTransacted(true);

        configurer.configure(factory, connectionFactory);
        return factory;
    }

JmsListener

@JmsListener(destination = "B1Q1", containerFactory = "jmsListenerContainerFactoryxxxx")
public  void receiveMessage(Message message) {
    try {
        TextMessage m = (TextMessage) message;
        String messageText = m.getText();

        int retryNum = message.getIntProperty("JMSXDeliveryCount");
        long s = message.getLongProperty("JMSTimestamp");

        Date d = new Date( s );

        String dbText = String.format("Retry %d. Message: %s", retryNum, messageText);

        if ( messageText.toLowerCase().contains("exception") ) {
            logger.info("Creating exception for retry: {}", retryNum);
            throw new RuntimeException();
        }
    } catch (JMSException e) {
        logger.error("Exception!!", e);
    }
}

我应该如何更改代码,以便在抛出异常时不会重新传递消息?

回到我的应用程序,我将在其中将消息插入数据库。在将消息插入数据库之后但在执行 long-运行 任务之前,我如何通过 JmsListener 确认消息?

为了能够使用 AUTO_ACKNOWLEDGECLIENT_ACKNOWLEDGE 我必须在配置工厂后调用 factory.setSessionTransacted(false)

调用 configurer.configure(factory, connectionFactory) 覆盖 sessionTransacted 的值,在我的例子中,它将它设置为 true,这使得 AUTO_ACKNOWLEDGECLIENT_ACKNOWLEDGE 无效。下面是DefaultJmsListenerContainerFactoryConfigurer.java的相关代码:

public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
...
...
        if (this.transactionManager != null) {
            factory.setTransactionManager(this.transactionManager);
        } else {
            factory.setSessionTransacted(true);
        }
...
...
          factory.setSessionAcknowledgeMode(Tibjms.EXPLICIT_CLIENT_ACKNOWLEDGE);
          //factory.setSessionTransacted(false);// here it’s  not working
          factory.setTaskExecutor(new SimpleAsyncTaskExecutor("KDBMessageListener-"));
          configurer.configure(factory, connectionFactory);
          factory.setSessionTransacted(false); //post configure ,session transacted is working