如何使用 Spring JmsListener 手动确认来自 ActiveMQ 的消息
How to manually acknowledge message from ActiveMQ using Spring JmsListener
我将 ActiveMQ(带有 JMS)与 Spring 的 JmsListener 一起使用。我能够使用来自 ActiveMQ 队列的消息,但它使用的是 AUTO_ACKNOWLEDGE
。我如何设置 CLIENT_ACKNOWLEDGE
以便我只能在确认后才能使用其他消息。
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTransactedIndividualAck(true);
activeMQConnectionFactory.setUserName(mqUserName);
activeMQConnectionFactory.setPassword(mqPassword);
activeMQConnectionFactory.setBrokerURL(mqUrl);
return activeMQConnectionFactory;
}
@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(t -> {
logger.info("An error has occurred in the transaction");
logger.error(t.getCause().getMessage());
});
configurer.configure(factory, connectionFactory);
factory.setConcurrency("4");
// You could still override some of Boot's default if necessary.
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@JmsListener(destination = "QUEUE_1", containerFactory = "myFactory", concurrency = "2")
public void receiveImgGenerationMessage(String transaction) {
logger.info("message received in queue " + transaction);
//I will call other api to process the message and do some operation
//after the message is processed
//I have to Acknowledge the message is processed
//so that i can consume the other message for process.
}
// jmsTemplate bean
public void sendmessage() {
for (int i =0 ; i < 10 ; < i++) {
jmsTemplate.convertAndSend("QUEUE_1", i);
}
}
您应该在您的网站上使用 setSessionAcknowledgeMode
方法
org.springframework.jms.config.DefaultJmsListenerContainerFactory
实例设置 CLIENT_ACKNOWLEDGE
模式,例如:
@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(t -> {
logger.info("An error has occurred in the transaction");
logger.error(t.getCause().getMessage());
});
factory.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE);
configurer.configure(factory, connectionFactory);
factory.setConcurrency("4");
// You could still override some of Boot's default if necessary.
return factory;
}
这在 Spring JMS JavaDoc 中讨论:
The listener container offers the following message acknowledgment options:
"sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): This mode is container-dependent: For DefaultMessageListenerContainer
, it means automatic message acknowledgment before listener execution, with no redelivery in case of an exception and no redelivery in case of other listener execution interruptions either. For SimpleMessageListenerContainer
, it means automatic message acknowledgment after listener execution, with no redelivery in case of a user exception thrown but potential redelivery in case of the JVM dying during listener execution. In order to consistently arrange for redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - preferably - setting "sessionTransacted" to "true" instead.
"sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": Lazy message acknowledgment during (DefaultMessageListenerContainer
) or shortly after (SimpleMessageListenerContainer
) listener execution; no redelivery in case of a user exception thrown but potential redelivery in case of the JVM dying during listener execution. In order to consistently arrange for redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - preferably - setting "sessionTransacted" to "true" instead.
"sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; best-effort redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).
"sessionTransacted" set to "true": Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).
您也可以在 Spring 引导中使用它 application.properties
:
spring.jms.listener.acknowledge-mode=CLIENT
我将 ActiveMQ(带有 JMS)与 Spring 的 JmsListener 一起使用。我能够使用来自 ActiveMQ 队列的消息,但它使用的是 AUTO_ACKNOWLEDGE
。我如何设置 CLIENT_ACKNOWLEDGE
以便我只能在确认后才能使用其他消息。
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTransactedIndividualAck(true);
activeMQConnectionFactory.setUserName(mqUserName);
activeMQConnectionFactory.setPassword(mqPassword);
activeMQConnectionFactory.setBrokerURL(mqUrl);
return activeMQConnectionFactory;
}
@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(t -> {
logger.info("An error has occurred in the transaction");
logger.error(t.getCause().getMessage());
});
configurer.configure(factory, connectionFactory);
factory.setConcurrency("4");
// You could still override some of Boot's default if necessary.
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@JmsListener(destination = "QUEUE_1", containerFactory = "myFactory", concurrency = "2")
public void receiveImgGenerationMessage(String transaction) {
logger.info("message received in queue " + transaction);
//I will call other api to process the message and do some operation
//after the message is processed
//I have to Acknowledge the message is processed
//so that i can consume the other message for process.
}
// jmsTemplate bean
public void sendmessage() {
for (int i =0 ; i < 10 ; < i++) {
jmsTemplate.convertAndSend("QUEUE_1", i);
}
}
您应该在您的网站上使用 setSessionAcknowledgeMode
方法
org.springframework.jms.config.DefaultJmsListenerContainerFactory
实例设置 CLIENT_ACKNOWLEDGE
模式,例如:
@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(t -> {
logger.info("An error has occurred in the transaction");
logger.error(t.getCause().getMessage());
});
factory.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE);
configurer.configure(factory, connectionFactory);
factory.setConcurrency("4");
// You could still override some of Boot's default if necessary.
return factory;
}
这在 Spring JMS JavaDoc 中讨论:
The listener container offers the following message acknowledgment options:
"sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): This mode is container-dependent: For
DefaultMessageListenerContainer
, it means automatic message acknowledgment before listener execution, with no redelivery in case of an exception and no redelivery in case of other listener execution interruptions either. ForSimpleMessageListenerContainer
, it means automatic message acknowledgment after listener execution, with no redelivery in case of a user exception thrown but potential redelivery in case of the JVM dying during listener execution. In order to consistently arrange for redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - preferably - setting "sessionTransacted" to "true" instead."sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": Lazy message acknowledgment during (
DefaultMessageListenerContainer
) or shortly after (SimpleMessageListenerContainer
) listener execution; no redelivery in case of a user exception thrown but potential redelivery in case of the JVM dying during listener execution. In order to consistently arrange for redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - preferably - setting "sessionTransacted" to "true" instead."sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; best-effort redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).
"sessionTransacted" set to "true": Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).
您也可以在 Spring 引导中使用它 application.properties
:
spring.jms.listener.acknowledge-mode=CLIENT