骆驼 JMS 交易不工作
Camel JMS Transacted not working
我正在尝试获取一个 Camel 路由 JMS->HTTP4,但在发生异常时消息没有传输到 ActiveMQ.DLQ,我不明白为什么。
下面的示例说明了如果 REST 服务的服务器关闭并且无法传递路由时可能发生的情况。
我得到正确的异常:
2018-01-18 12:30:50:962-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.s.s.TransactionErrorHandler - Transaction rollback (0x30a1c779) redelivered(false) for (MessageId: ID:MGR-MacBook-Pro.local-51837-1516262355358-4:2:1:1:16 on ExchangeId: ID-MGR-MacBook-Pro-local-1516275047663-0-1) caught: java.net.ConnectException: Cannot connect to CORE REST
2018-01-18 12:30:50:965-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.c.j.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.net.ConnectException: Cannot connect to CORE REST]
org.apache.camel.RuntimeCamelException: java.net.ConnectException: Cannot connect to CORE REST …
但是消息已被使用并从队列中删除。我的假设是使用 transaction/transacted Camel 和 AMQ 会解决这个问题并将消息移动到 ActiveMQ.DLQ.
我已经阅读了 Camel in Action 第 1 版的第 9 章。并用谷歌搜索但没有找到解决我的问题的方法。
我知道我可以 create/define 我自己的 TransactionErrorHandler() 并将消息存储在我选择的队列中,但我的印象是这是使用事务处理时的默认设置…
我正在使用独立的 ActiveMQ 5.15.2 香草安装和配置。
骆驼 2.20.1
Java 8_144 MacOS 10.13.2
我的配置:
@Configuration
public class Config {
/**
* The Camel context.
*/
final CamelContext camelContext;
/**
* The Broker url.
*/
@Value("${jms.broker.url}")
private String brokerURL;
/**
* Instantiates a new Config.
*
* @param camelContext the sisyfos context
* @param metricRegistry the metric registry
*/
@Autowired
public Config(final CamelContext camelContext, final MetricRegistry metricRegistry) {
this.camelContext = camelContext;
this.metricRegistry = metricRegistry;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
/**
* Pooled connection factory pooled connection factory.
*
* @return the pooled connection factory
*/
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory() {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(8);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
return pooledConnectionFactory;
}
/**
* Jms configuration jms configuration.
*
* @return the jms configuration
*/
@Bean
public JmsConfiguration jmsConfiguration() {
final JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
jmsConfiguration.setTransacted(true);
jmsConfiguration.setTransactionManager(transactionManager());
jmsConfiguration.setConcurrentConsumers(10);
return jmsConfiguration;
}
/**
* Transaction manager jms transaction manager.
*
* @return the jms transaction manager
*/
@Bean
public JmsTransactionManager transactionManager() {
final JmsTransactionManager transactionManager = new JmsTransactionManager();
transactionManager.setConnectionFactory(pooledConnectionFactory());
return transactionManager;
}
/**
* Active mq component active mq component.
*
* @return the active mq component
*/
@Bean
public ActiveMQComponent activeMQComponent(JmsConfiguration jmsConfiguration,
PooledConnectionFactory pooledConnectionFactory,
JmsTransactionManager transactionManager) {
final ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConfiguration(jmsConfiguration);
activeMQComponent.setTransacted(true);
activeMQComponent.setUsePooledConnection(true);
activeMQComponent.setConnectionFactory(pooledConnectionFactory);
activeMQComponent.setTransactionManager(transactionManager);
return activeMQComponent;
}
}
我的路线:
@Component
public class SendToCore extends SpringRouteBuilder {
@Override
public void configure() throws Exception {
Logger.getLogger(SendToCore.class).info("Sending to CORE");
//No retries if first fails due to connection error
interceptSendToEndpoint("http4:*")
.choice()
.when(header("JMSRedelivered").isEqualTo("false"))
.throwException(new ConnectException("Cannot connect to CORE REST"))
.end();
from("activemq:queue:myIncomingQueue")
.transacted()
.setHeader(Exchange.CONTENT_TYPE, constant("application/xml"))
.to("http4:localhost/myRESTservice")
.log("${header.CamelHttpResponseCode}")
.end();
}
}
您可能会在某些 bean 中发现冗余声明,这就是我正在尝试解决的问题……
将 link 添加到我的 Github 回购中,并用一个小测试项目说明了这一点:
https://github.com/hakuseki/transacted
刚刚注意到,如果您希望Spring Boot 处理那些池和配置的生命周期,那么您不应该直接调用它们的方法,而是让它们作为方法签名中的参数提供
例如这个
public ActiveMQComponent activeMQComponent() {
应该是
public ActiveMQComponent activeMQComponent(JmsConfiguration config, ConnectionFactory cf, ...) {
然后SpringBoot会为你提供这些bean。
关于为什么你的交易不行,那你可以看看Camel in Action 2nd edition书中的一些交易例子:https://github.com/camelinaction/camelinaction2/tree/master/chapter12
这可能是SpringBoot自动配置的问题
如果消息丢失而不是进入 DLQ,Camel 的 ActiveMQ 组件会自动提交它们,而不是等到工作完成。
更新:使您的示例与 Java 配置一起工作的步骤
Notice: my config does not have a transaction manager because it is not needed for your case. Instead just set in the ActiveMQComponent
transacted
to true
and lazyCreateTransactionManager
to false
. Then you got a "local" transaction with your broker and that is all you need.
- 我从你的路由中删除了
.transacted()
(需要事务管理器,但不需要 "JMS local-transacted" 路由)
- 我在路由中注释掉了你的错误处理程序class(需要一个事务管理器,你可以使用默认的错误处理程序)
- 在
MainApplication
中禁用 JMS 和 ActiveMQ 的自动配置:@SpringBootApplication(exclude = { JmsAutoConfiguration.class, ActiveMQAutoConfiguration.class})
- 将您的 Java 配置替换为以下配置(改编自此问题:ConnectionFactory get destroyed before camel)
Java配置:
@Value("${jms.broker.url}")
String brokerURL;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory(ConnectionFactory cf) {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(1);
pooledConnectionFactory.setConnectionFactory(cf);
return pooledConnectionFactory;
}
@Bean(name = "activemq")
@ConditionalOnClass(ActiveMQComponent.class)
public ActiveMQComponent activeMQComponent(ConnectionFactory connectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConnectionFactory(connectionFactory);
activeMQComponent.setTransacted(true);
activeMQComponent.setLazyCreateTransactionManager(false);
return activeMQComponent;
}
最后,只是为了"run"路由,我添加了一个小的Camel Route测试
@RunWith(CamelSpringBootRunner.class)
@SpringBootTest(classes = MainApplication.class)
public class SampleCamelApplicationTest {
@Produce(uri = "activemq:queue:myIncomingQueue")
protected ProducerTemplate template;
@Test
public void shouldProduceMessages() throws Exception {
template.sendBody("test");
Thread.sleep(20000); //wait for ActiveMQ redeliveries
}
}
如果我 运行 这个测试,消息将发送到 ActiveMQ.DLQ
。
希望这对您有所帮助
我正在尝试获取一个 Camel 路由 JMS->HTTP4,但在发生异常时消息没有传输到 ActiveMQ.DLQ,我不明白为什么。
下面的示例说明了如果 REST 服务的服务器关闭并且无法传递路由时可能发生的情况。
我得到正确的异常:
2018-01-18 12:30:50:962-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.s.s.TransactionErrorHandler - Transaction rollback (0x30a1c779) redelivered(false) for (MessageId: ID:MGR-MacBook-Pro.local-51837-1516262355358-4:2:1:1:16 on ExchangeId: ID-MGR-MacBook-Pro-local-1516275047663-0-1) caught: java.net.ConnectException: Cannot connect to CORE REST
2018-01-18 12:30:50:965-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.c.j.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.net.ConnectException: Cannot connect to CORE REST]
org.apache.camel.RuntimeCamelException: java.net.ConnectException: Cannot connect to CORE REST …
但是消息已被使用并从队列中删除。我的假设是使用 transaction/transacted Camel 和 AMQ 会解决这个问题并将消息移动到 ActiveMQ.DLQ.
我已经阅读了 Camel in Action 第 1 版的第 9 章。并用谷歌搜索但没有找到解决我的问题的方法。
我知道我可以 create/define 我自己的 TransactionErrorHandler() 并将消息存储在我选择的队列中,但我的印象是这是使用事务处理时的默认设置…
我正在使用独立的 ActiveMQ 5.15.2 香草安装和配置。
骆驼 2.20.1
Java 8_144 MacOS 10.13.2
我的配置:
@Configuration
public class Config {
/**
* The Camel context.
*/
final CamelContext camelContext;
/**
* The Broker url.
*/
@Value("${jms.broker.url}")
private String brokerURL;
/**
* Instantiates a new Config.
*
* @param camelContext the sisyfos context
* @param metricRegistry the metric registry
*/
@Autowired
public Config(final CamelContext camelContext, final MetricRegistry metricRegistry) {
this.camelContext = camelContext;
this.metricRegistry = metricRegistry;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
/**
* Pooled connection factory pooled connection factory.
*
* @return the pooled connection factory
*/
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory() {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(8);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
return pooledConnectionFactory;
}
/**
* Jms configuration jms configuration.
*
* @return the jms configuration
*/
@Bean
public JmsConfiguration jmsConfiguration() {
final JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
jmsConfiguration.setTransacted(true);
jmsConfiguration.setTransactionManager(transactionManager());
jmsConfiguration.setConcurrentConsumers(10);
return jmsConfiguration;
}
/**
* Transaction manager jms transaction manager.
*
* @return the jms transaction manager
*/
@Bean
public JmsTransactionManager transactionManager() {
final JmsTransactionManager transactionManager = new JmsTransactionManager();
transactionManager.setConnectionFactory(pooledConnectionFactory());
return transactionManager;
}
/**
* Active mq component active mq component.
*
* @return the active mq component
*/
@Bean
public ActiveMQComponent activeMQComponent(JmsConfiguration jmsConfiguration,
PooledConnectionFactory pooledConnectionFactory,
JmsTransactionManager transactionManager) {
final ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConfiguration(jmsConfiguration);
activeMQComponent.setTransacted(true);
activeMQComponent.setUsePooledConnection(true);
activeMQComponent.setConnectionFactory(pooledConnectionFactory);
activeMQComponent.setTransactionManager(transactionManager);
return activeMQComponent;
}
}
我的路线:
@Component
public class SendToCore extends SpringRouteBuilder {
@Override
public void configure() throws Exception {
Logger.getLogger(SendToCore.class).info("Sending to CORE");
//No retries if first fails due to connection error
interceptSendToEndpoint("http4:*")
.choice()
.when(header("JMSRedelivered").isEqualTo("false"))
.throwException(new ConnectException("Cannot connect to CORE REST"))
.end();
from("activemq:queue:myIncomingQueue")
.transacted()
.setHeader(Exchange.CONTENT_TYPE, constant("application/xml"))
.to("http4:localhost/myRESTservice")
.log("${header.CamelHttpResponseCode}")
.end();
}
}
您可能会在某些 bean 中发现冗余声明,这就是我正在尝试解决的问题……
将 link 添加到我的 Github 回购中,并用一个小测试项目说明了这一点:
https://github.com/hakuseki/transacted
刚刚注意到,如果您希望Spring Boot 处理那些池和配置的生命周期,那么您不应该直接调用它们的方法,而是让它们作为方法签名中的参数提供
例如这个
public ActiveMQComponent activeMQComponent() {
应该是
public ActiveMQComponent activeMQComponent(JmsConfiguration config, ConnectionFactory cf, ...) {
然后SpringBoot会为你提供这些bean。
关于为什么你的交易不行,那你可以看看Camel in Action 2nd edition书中的一些交易例子:https://github.com/camelinaction/camelinaction2/tree/master/chapter12
这可能是SpringBoot自动配置的问题
如果消息丢失而不是进入 DLQ,Camel 的 ActiveMQ 组件会自动提交它们,而不是等到工作完成。
更新:使您的示例与 Java 配置一起工作的步骤
Notice: my config does not have a transaction manager because it is not needed for your case. Instead just set in the
ActiveMQComponent
transacted
totrue
andlazyCreateTransactionManager
tofalse
. Then you got a "local" transaction with your broker and that is all you need.
- 我从你的路由中删除了
.transacted()
(需要事务管理器,但不需要 "JMS local-transacted" 路由) - 我在路由中注释掉了你的错误处理程序class(需要一个事务管理器,你可以使用默认的错误处理程序)
- 在
MainApplication
中禁用 JMS 和 ActiveMQ 的自动配置:@SpringBootApplication(exclude = { JmsAutoConfiguration.class, ActiveMQAutoConfiguration.class})
- 将您的 Java 配置替换为以下配置(改编自此问题:ConnectionFactory get destroyed before camel)
Java配置:
@Value("${jms.broker.url}")
String brokerURL;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory(ConnectionFactory cf) {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(1);
pooledConnectionFactory.setConnectionFactory(cf);
return pooledConnectionFactory;
}
@Bean(name = "activemq")
@ConditionalOnClass(ActiveMQComponent.class)
public ActiveMQComponent activeMQComponent(ConnectionFactory connectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConnectionFactory(connectionFactory);
activeMQComponent.setTransacted(true);
activeMQComponent.setLazyCreateTransactionManager(false);
return activeMQComponent;
}
最后,只是为了"run"路由,我添加了一个小的Camel Route测试
@RunWith(CamelSpringBootRunner.class)
@SpringBootTest(classes = MainApplication.class)
public class SampleCamelApplicationTest {
@Produce(uri = "activemq:queue:myIncomingQueue")
protected ProducerTemplate template;
@Test
public void shouldProduceMessages() throws Exception {
template.sendBody("test");
Thread.sleep(20000); //wait for ActiveMQ redeliveries
}
}
如果我 运行 这个测试,消息将发送到 ActiveMQ.DLQ
。
希望这对您有所帮助