未设置 ActiveMQ RedeliveryPolicy
ActiveMQ RedeliveryPolicy not being set
我正在使用:
- SpringBoot 2.0.4
- ActiveMQ 5.15.5
- 阿帕奇骆驼 2.22.0
- Java1.8
- Groovy
- 行家
基本上,我有一个带有 Apache Camel 路由的 SpringBoot 应用程序,该路由使用来自 ActiveMQ 的事务消息。我需要在ActiveMQ上设置一个RedeliveryPolicy,这样当处理出现错误时,消息会被重试多次。
我已经用 ActiveMQ 的 beans 进行了配置 class,事务按预期工作,但 RedeliveryPolicy 不起作用。谁能帮我理解这有什么问题吗?
这是产生错误的消息的日志输出:
2018-10-23 10:35:28.005 DEBUG 10524 --- [mer[entryQueue]] o.a.c.s.spi.TransactionErrorHandler : Transaction begin (0x35d60381) redelivered(false) for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1))
2018-10-23 10:35:28.020 DEBUG 10524 --- [mer[entryQueue]] o.apache.camel.processor.SendProcessor : >>>> direct://middle Exchange[ID-EPIC-LAP-25-1540312510586-0-1]
2018-10-23 10:35:28.375 DEBUG 10524 --- [mer[entryQueue]] o.a.camel.processor.DefaultErrorHandler : Failed delivery for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1). On delivery attempt: 0 caught: java.lang.RuntimeException: ExceptionTest: Order Failed
2018-10-23 10:35:28.390 ERROR 10524 --- [mer[entryQueue]] o.a.camel.processor.DefaultErrorHandler : Failed delivery for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1). Exhausted after delivery attempt: 1 caught: java.lang.RuntimeException: ExceptionTest: Order Failed
这是我对 ActiveMQ 的配置 class:
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.RedeliveryPolicy
import org.apache.activemq.camel.component.ActiveMQComponent
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.connection.JmsTransactionManager
import javax.jms.DeliveryMode
@Configuration
class ActiveMQConfiguration {
@Bean
ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory()
activeMQConnectionFactory.brokerURL = 'tcp://localhost:61616'
activeMQConnectionFactory.userName = 'admin'
activeMQConnectionFactory.password = 'admin'
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy()
redeliveryPolicy.maximumRedeliveries = 3
redeliveryPolicy.redeliveryDelay = 150L
redeliveryPolicy.useExponentialBackOff = true
redeliveryPolicy.backOffMultiplier = 1.5
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy)
activeMQConnectionFactory
}
@Bean
ActiveMQComponent activeMQComponent(@Qualifier('activeMQConnectionFactory')ActiveMQConnectionFactory activeMQConnectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent()
activeMQComponent.connectionFactory = activeMQConnectionFactory
activeMQComponent.transacted = true
activeMQComponent.transactionManager = txManager()
activeMQComponent.cacheLevelName = 'CACHE_CONSUMER'
activeMQComponent.lazyCreateTransactionManager = false
activeMQComponent.deliveryMode = DeliveryMode.PERSISTENT
activeMQComponent
}
@Bean
JmsTransactionManager txManager(@Qualifier('activeMQConnectionFactory') ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsTransactionManager txManager = new JmsTransactionManager()
txManager.connectionFactory = activeMQConnectionFactory
txManager.rollbackOnCommitFailure = true
txManager
}
}
不久前,我遇到了 dlq 队列问题 - 并非代码中设置的所有参数都有效。我必须向 acitvemq 配置添加设置。是的,划分配置不是一个好的决定,但我没有找到另一个。
下面是我的 jms 配置 class 和一个示例队列配置 activemq.xml:
@Configuration
@EnableJms
public class JmsConfig {
private Environment env;
@Autowired
public void setEnv(Environment env) {
this.env = env;
}
@Bean(name = "activemq")
public ActiveMQComponent activemq(@Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager,
@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setTransactionManager(jmsTransactionManager);
activeMQComponent.setConnectionFactory(connectionFactory);
return activeMQComponent;
}
@Bean(name = "activemqJmsTemplate")
public JmsTemplate jmsTemplate(@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory);
return template;
}
@Bean(name = "activemqTransactionPolicy")
public SpringTransactionPolicy activemqTransactionPolicy(
@Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager) {
SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(jmsTransactionManager);
springTransactionPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return springTransactionPolicy;
}
@Bean(name = "activemqTransactionManager")
public JmsTransactionManager activemqTransactionManager(
@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean(name = "activemqConnectionFactory")
public ConnectionFactory connectionFactory(@Qualifier("activemqRedeliveryPolicy") RedeliveryPolicy redeliveryPolicy) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://" + env.getProperty("queue.url"));
connectionFactory.setTrustAllPackages(true);
RedeliveryPolicyMap map = connectionFactory.getRedeliveryPolicyMap();
map.put(new ActiveMQQueue("queueName.DLQ"), redeliveryPolicy);
return connectionFactory;
}
@Bean(name = "activemqRedeliveryPolicy")
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(0);
return redeliveryPolicy;
}
}
activevq.xml 的变化:
<destinationPolicy>
<policyMap>
<policyEntries>
<!--set dead letter queue for our queue. It name will be "myQueueName.DLQ"-->
<policyEntry queue="myQueueName">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="" queueSuffix=".DLQ"/>
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<!--Set the redelivery delay to one hour-->
<redeliveryPolicy queue="myQueueName.DLQ" maximumRedeliveries="-1" redeliveryDelay="3600000"/>
</redeliveryPolicyEntries>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
这里有两个问题
1.您有两个事务管理器
由于您在Camel ActiveMQ组件的配置中有以下两行,您配置了两个事务管理器。这是问题的根源。
activeMQComponent.transacted = true // activates local JMS transactions
activeMQComponent.transactionManager = txManager() // additional tx manager
如果您只想从 ActiveMQ 使用事务,您不需要配置 Spring 事务管理器。
您的这两行配置足以与您的 ActiveMQ 代理进行本地交易。
activeMQComponent.transacted = true
activeMQComponent.lazyCreateTransactionManager = false
所以你应该删除这一行以及整个 txManager
bean
activeMQComponent.transactionManager = txManager()
如果您目前在您的 Camel 路线中设置了交易标志,您也必须将其删除。正如我所写的,即使您删除了所有这些,您从 ActiveMQ 消耗的路由仍然会被处理。
2。重新投递无效
您还没有发布您的 Camel 路由,但根据错误输出,我认为 broker 没有重新交付,因为错误由 Camel 处理。
Camel 错误处理程序 o.a.camel.processor.DefaultErrorHandler
在错误发生时启动,并且由于它处理错误,消息被提交给代理,因此不会发生重新传递。
尝试禁用 Camel 错误处理以查看代理是否重新传递错误消息。
errorHandler(noErrorHandler());
我正在使用:
- SpringBoot 2.0.4
- ActiveMQ 5.15.5
- 阿帕奇骆驼 2.22.0
- Java1.8
- Groovy
- 行家
基本上,我有一个带有 Apache Camel 路由的 SpringBoot 应用程序,该路由使用来自 ActiveMQ 的事务消息。我需要在ActiveMQ上设置一个RedeliveryPolicy,这样当处理出现错误时,消息会被重试多次。
我已经用 ActiveMQ 的 beans 进行了配置 class,事务按预期工作,但 RedeliveryPolicy 不起作用。谁能帮我理解这有什么问题吗?
这是产生错误的消息的日志输出:
2018-10-23 10:35:28.005 DEBUG 10524 --- [mer[entryQueue]] o.a.c.s.spi.TransactionErrorHandler : Transaction begin (0x35d60381) redelivered(false) for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1)) 2018-10-23 10:35:28.020 DEBUG 10524 --- [mer[entryQueue]] o.apache.camel.processor.SendProcessor : >>>> direct://middle Exchange[ID-EPIC-LAP-25-1540312510586-0-1] 2018-10-23 10:35:28.375 DEBUG 10524 --- [mer[entryQueue]] o.a.camel.processor.DefaultErrorHandler : Failed delivery for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1). On delivery attempt: 0 caught: java.lang.RuntimeException: ExceptionTest: Order Failed 2018-10-23 10:35:28.390 ERROR 10524 --- [mer[entryQueue]] o.a.camel.processor.DefaultErrorHandler : Failed delivery for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1). Exhausted after delivery attempt: 1 caught: java.lang.RuntimeException: ExceptionTest: Order Failed
这是我对 ActiveMQ 的配置 class:
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.RedeliveryPolicy
import org.apache.activemq.camel.component.ActiveMQComponent
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.connection.JmsTransactionManager
import javax.jms.DeliveryMode
@Configuration
class ActiveMQConfiguration {
@Bean
ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory()
activeMQConnectionFactory.brokerURL = 'tcp://localhost:61616'
activeMQConnectionFactory.userName = 'admin'
activeMQConnectionFactory.password = 'admin'
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy()
redeliveryPolicy.maximumRedeliveries = 3
redeliveryPolicy.redeliveryDelay = 150L
redeliveryPolicy.useExponentialBackOff = true
redeliveryPolicy.backOffMultiplier = 1.5
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy)
activeMQConnectionFactory
}
@Bean
ActiveMQComponent activeMQComponent(@Qualifier('activeMQConnectionFactory')ActiveMQConnectionFactory activeMQConnectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent()
activeMQComponent.connectionFactory = activeMQConnectionFactory
activeMQComponent.transacted = true
activeMQComponent.transactionManager = txManager()
activeMQComponent.cacheLevelName = 'CACHE_CONSUMER'
activeMQComponent.lazyCreateTransactionManager = false
activeMQComponent.deliveryMode = DeliveryMode.PERSISTENT
activeMQComponent
}
@Bean
JmsTransactionManager txManager(@Qualifier('activeMQConnectionFactory') ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsTransactionManager txManager = new JmsTransactionManager()
txManager.connectionFactory = activeMQConnectionFactory
txManager.rollbackOnCommitFailure = true
txManager
}
}
不久前,我遇到了 dlq 队列问题 - 并非代码中设置的所有参数都有效。我必须向 acitvemq 配置添加设置。是的,划分配置不是一个好的决定,但我没有找到另一个。 下面是我的 jms 配置 class 和一个示例队列配置 activemq.xml:
@Configuration
@EnableJms
public class JmsConfig {
private Environment env;
@Autowired
public void setEnv(Environment env) {
this.env = env;
}
@Bean(name = "activemq")
public ActiveMQComponent activemq(@Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager,
@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setTransactionManager(jmsTransactionManager);
activeMQComponent.setConnectionFactory(connectionFactory);
return activeMQComponent;
}
@Bean(name = "activemqJmsTemplate")
public JmsTemplate jmsTemplate(@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory);
return template;
}
@Bean(name = "activemqTransactionPolicy")
public SpringTransactionPolicy activemqTransactionPolicy(
@Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager) {
SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(jmsTransactionManager);
springTransactionPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return springTransactionPolicy;
}
@Bean(name = "activemqTransactionManager")
public JmsTransactionManager activemqTransactionManager(
@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean(name = "activemqConnectionFactory")
public ConnectionFactory connectionFactory(@Qualifier("activemqRedeliveryPolicy") RedeliveryPolicy redeliveryPolicy) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://" + env.getProperty("queue.url"));
connectionFactory.setTrustAllPackages(true);
RedeliveryPolicyMap map = connectionFactory.getRedeliveryPolicyMap();
map.put(new ActiveMQQueue("queueName.DLQ"), redeliveryPolicy);
return connectionFactory;
}
@Bean(name = "activemqRedeliveryPolicy")
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(0);
return redeliveryPolicy;
}
}
activevq.xml 的变化:
<destinationPolicy>
<policyMap>
<policyEntries>
<!--set dead letter queue for our queue. It name will be "myQueueName.DLQ"-->
<policyEntry queue="myQueueName">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="" queueSuffix=".DLQ"/>
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<!--Set the redelivery delay to one hour-->
<redeliveryPolicy queue="myQueueName.DLQ" maximumRedeliveries="-1" redeliveryDelay="3600000"/>
</redeliveryPolicyEntries>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
这里有两个问题
1.您有两个事务管理器
由于您在Camel ActiveMQ组件的配置中有以下两行,您配置了两个事务管理器。这是问题的根源。
activeMQComponent.transacted = true // activates local JMS transactions
activeMQComponent.transactionManager = txManager() // additional tx manager
如果您只想从 ActiveMQ 使用事务,您不需要配置 Spring 事务管理器。
您的这两行配置足以与您的 ActiveMQ 代理进行本地交易。
activeMQComponent.transacted = true
activeMQComponent.lazyCreateTransactionManager = false
所以你应该删除这一行以及整个 txManager
bean
activeMQComponent.transactionManager = txManager()
如果您目前在您的 Camel 路线中设置了交易标志,您也必须将其删除。正如我所写的,即使您删除了所有这些,您从 ActiveMQ 消耗的路由仍然会被处理。
2。重新投递无效
您还没有发布您的 Camel 路由,但根据错误输出,我认为 broker 没有重新交付,因为错误由 Camel 处理。
Camel 错误处理程序 o.a.camel.processor.DefaultErrorHandler
在错误发生时启动,并且由于它处理错误,消息被提交给代理,因此不会发生重新传递。
尝试禁用 Camel 错误处理以查看代理是否重新传递错误消息。
errorHandler(noErrorHandler());