如何使用 Spring JmsListener 手动确认来自 ActiveMQ 的消息

How to manually acknowledge message from ActiveMQ using Spring JmsListener

我将 ActiveMQ(带有 JMS)与 Spring 的 JmsListener 一起使用。我能够使用来自 ActiveMQ 队列的消息,但它使用的是 AUTO_ACKNOWLEDGE。我如何设置 CLIENT_ACKNOWLEDGE 以便我只能在确认后才能使用其他消息。

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setTransactedIndividualAck(true);
    activeMQConnectionFactory.setUserName(mqUserName);
    activeMQConnectionFactory.setPassword(mqPassword);
    activeMQConnectionFactory.setBrokerURL(mqUrl);
    return activeMQConnectionFactory;
}

@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> {
        logger.info("An error has occurred in the transaction");
        logger.error(t.getCause().getMessage());
    });

    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("4");

    // You could still override some of Boot's default if necessary.
    return factory;
}

@Bean
public MessageConverter jacksonJmsMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTargetType(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;
}

@JmsListener(destination = "QUEUE_1", containerFactory = "myFactory", concurrency = "2")
public void receiveImgGenerationMessage(String transaction) {
    logger.info("message received in queue " + transaction);
    //I will call other api to process the message and do some operation 
    //after the message is processed 
    //I have to Acknowledge the message is processed
    //so that i can consume the other message for process.
}

// jmsTemplate bean
public void sendmessage() {
    for (int i =0 ; i < 10 ; < i++) { 
        jmsTemplate.convertAndSend("QUEUE_1", i);
    }
}

您应该在您的网站上使用 setSessionAcknowledgeMode 方法 org.springframework.jms.config.DefaultJmsListenerContainerFactory 实例设置 CLIENT_ACKNOWLEDGE 模式,例如:

@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> {
        logger.info("An error has occurred in the transaction");
        logger.error(t.getCause().getMessage());
    });

    factory.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE);

    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("4");

    // You could still override some of Boot's default if necessary.
    return factory;
}

这在 Spring JMS JavaDoc 中讨论:

The listener container offers the following message acknowledgment options:

  • "sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): This mode is container-dependent: For DefaultMessageListenerContainer, it means automatic message acknowledgment before listener execution, with no redelivery in case of an exception and no redelivery in case of other listener execution interruptions either. For SimpleMessageListenerContainer, it means automatic message acknowledgment after listener execution, with no redelivery in case of a user exception thrown but potential redelivery in case of the JVM dying during listener execution. In order to consistently arrange for redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - preferably - setting "sessionTransacted" to "true" instead.

  • "sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": Lazy message acknowledgment during (DefaultMessageListenerContainer) or shortly after (SimpleMessageListenerContainer) listener execution; no redelivery in case of a user exception thrown but potential redelivery in case of the JVM dying during listener execution. In order to consistently arrange for redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - preferably - setting "sessionTransacted" to "true" instead.

  • "sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; best-effort redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).

  • "sessionTransacted" set to "true": Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).

您也可以在 Spring 引导中使用它 application.properties:

spring.jms.listener.acknowledge-mode=CLIENT