在 DefaultMessageListenerContainer 中配置 sessionAcknowledgeMode
Configuring sessionAcknowledgeMode in DefaultMessageListenerContainer
我有一个设置,我必须从 ActiveMQ 代理中的队列中读取消息。阅读邮件后,我必须对邮件进行长运行操作。
由于对消息的这种长时间 运行 操作,我想尽快确认消息,以便释放代理上的资源。计划是在收到消息后执行以下步骤:
- 从 ActiveMQ 获取消息
- 将消息插入数据库
- 确认消息
- 对消息
做一些长运行操作
我已经了解了 JMS 和不同的确认模式,所以在尝试这样做之前,我决定设置一个应用程序,我可以在其中尝试不同的模式以了解它们是如何处理的,不幸的是我似乎无法得到我想要的输出。
根据此答案中的信息 as well as https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html 我认为通过使用 AUTO_ACKNOWLEDGE 消息会在我的侦听器被调用之前得到确认,但如果我在侦听器中抛出异常,则消息是重新交付。
我尝试过将 setSessionTransacted 设置为 true 和不设置为 true,但在这两种情况下我得到相同的输出。在 JmsListener 中抛出异常时重新传递消息。
JMS的配置
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(jmsConfig.getBrokerUrl());
return connectionFactory;
}
@Bean
public JmsTemplate jmstemplate(){
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory());
//jmsTemplate.setSessionTransacted(true);
jmsTemplate.setDefaultDestinationName( jmsConfig.getQueueIn() );
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactoryxxxx(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
//factory.setConcurrency("1");
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
return factory;
}
JmsListener
@JmsListener(destination = "B1Q1", containerFactory = "jmsListenerContainerFactoryxxxx")
public void receiveMessage(Message message) {
try {
TextMessage m = (TextMessage) message;
String messageText = m.getText();
int retryNum = message.getIntProperty("JMSXDeliveryCount");
long s = message.getLongProperty("JMSTimestamp");
Date d = new Date( s );
String dbText = String.format("Retry %d. Message: %s", retryNum, messageText);
if ( messageText.toLowerCase().contains("exception") ) {
logger.info("Creating exception for retry: {}", retryNum);
throw new RuntimeException();
}
} catch (JMSException e) {
logger.error("Exception!!", e);
}
}
我应该如何更改代码,以便在抛出异常时不会重新传递消息?
回到我的应用程序,我将在其中将消息插入数据库。在将消息插入数据库之后但在执行 long-运行 任务之前,我如何通过 JmsListener 确认消息?
为了能够使用 AUTO_ACKNOWLEDGE
或 CLIENT_ACKNOWLEDGE
我必须在配置工厂后调用 factory.setSessionTransacted(false)
。
调用 configurer.configure(factory, connectionFactory)
覆盖 sessionTransacted
的值,在我的例子中,它将它设置为 true
,这使得 AUTO_ACKNOWLEDGE
或 CLIENT_ACKNOWLEDGE
无效。下面是DefaultJmsListenerContainerFactoryConfigurer.java
的相关代码:
public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
...
...
if (this.transactionManager != null) {
factory.setTransactionManager(this.transactionManager);
} else {
factory.setSessionTransacted(true);
}
...
...
factory.setSessionAcknowledgeMode(Tibjms.EXPLICIT_CLIENT_ACKNOWLEDGE);
//factory.setSessionTransacted(false);// here it’s not working
factory.setTaskExecutor(new SimpleAsyncTaskExecutor("KDBMessageListener-"));
configurer.configure(factory, connectionFactory);
factory.setSessionTransacted(false); //post configure ,session transacted is working
我有一个设置,我必须从 ActiveMQ 代理中的队列中读取消息。阅读邮件后,我必须对邮件进行长运行操作。
由于对消息的这种长时间 运行 操作,我想尽快确认消息,以便释放代理上的资源。计划是在收到消息后执行以下步骤:
- 从 ActiveMQ 获取消息
- 将消息插入数据库
- 确认消息
- 对消息 做一些长运行操作
我已经了解了 JMS 和不同的确认模式,所以在尝试这样做之前,我决定设置一个应用程序,我可以在其中尝试不同的模式以了解它们是如何处理的,不幸的是我似乎无法得到我想要的输出。
根据此答案中的信息 as well as https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html 我认为通过使用 AUTO_ACKNOWLEDGE 消息会在我的侦听器被调用之前得到确认,但如果我在侦听器中抛出异常,则消息是重新交付。
我尝试过将 setSessionTransacted 设置为 true 和不设置为 true,但在这两种情况下我得到相同的输出。在 JmsListener 中抛出异常时重新传递消息。
JMS的配置
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(jmsConfig.getBrokerUrl());
return connectionFactory;
}
@Bean
public JmsTemplate jmstemplate(){
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory());
//jmsTemplate.setSessionTransacted(true);
jmsTemplate.setDefaultDestinationName( jmsConfig.getQueueIn() );
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactoryxxxx(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
//factory.setConcurrency("1");
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
return factory;
}
JmsListener
@JmsListener(destination = "B1Q1", containerFactory = "jmsListenerContainerFactoryxxxx")
public void receiveMessage(Message message) {
try {
TextMessage m = (TextMessage) message;
String messageText = m.getText();
int retryNum = message.getIntProperty("JMSXDeliveryCount");
long s = message.getLongProperty("JMSTimestamp");
Date d = new Date( s );
String dbText = String.format("Retry %d. Message: %s", retryNum, messageText);
if ( messageText.toLowerCase().contains("exception") ) {
logger.info("Creating exception for retry: {}", retryNum);
throw new RuntimeException();
}
} catch (JMSException e) {
logger.error("Exception!!", e);
}
}
我应该如何更改代码,以便在抛出异常时不会重新传递消息?
回到我的应用程序,我将在其中将消息插入数据库。在将消息插入数据库之后但在执行 long-运行 任务之前,我如何通过 JmsListener 确认消息?
为了能够使用 AUTO_ACKNOWLEDGE
或 CLIENT_ACKNOWLEDGE
我必须在配置工厂后调用 factory.setSessionTransacted(false)
。
调用 configurer.configure(factory, connectionFactory)
覆盖 sessionTransacted
的值,在我的例子中,它将它设置为 true
,这使得 AUTO_ACKNOWLEDGE
或 CLIENT_ACKNOWLEDGE
无效。下面是DefaultJmsListenerContainerFactoryConfigurer.java
的相关代码:
public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
...
...
if (this.transactionManager != null) {
factory.setTransactionManager(this.transactionManager);
} else {
factory.setSessionTransacted(true);
}
...
...
factory.setSessionAcknowledgeMode(Tibjms.EXPLICIT_CLIENT_ACKNOWLEDGE);
//factory.setSessionTransacted(false);// here it’s not working
factory.setTaskExecutor(new SimpleAsyncTaskExecutor("KDBMessageListener-"));
configurer.configure(factory, connectionFactory);
factory.setSessionTransacted(false); //post configure ,session transacted is working