如何加入来自两个不同连接工厂的 Spring JMS 事务?
How to join Spring JMS transactions from two different connection factories?
我使用不同的连接工厂来发送和接收消息,在传送失败的情况下遇到部分提交问题。 jms:message-driven-channel-adapter
使用 receiveConnectionFactory
ro 从队列中接收消息。 jms:outbound-channel-adapter
使用 deliverConnectionFactory
将多个消息发送到下游队列。我们只有一个 JmsTransactionManager
使用 receiveConnectionFactory
和 jms:outbound-channel-adapter
配置 session-transacted="true"
.
<beans>
<bean id="transactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="receiveConnectionFactory" />
</bean>
<bean id="receiveConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName" value="${mq.host}" />
<property name="channel" value="${mq.channel}" />
<property name="port" value="${mq.port}" />
</bean>
</property>
<property name="sessionCacheSize" value="${receive.factory.cachesize}" />
<property name="cacheProducers" value="${receive.cache.producers.enabled}" />
<property name="cacheConsumers" value="${receive.cache.consumers.enabled}" />
</bean>
<bean id="deliverConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName" value="${mq.host}" />
<property name="channel" value="${mq.channel}" />
<property name="port" value="${mq.port}" />
</bean>
</property>
<property name="sessionCacheSize" value="${send.factory.cachesize}" />
<property name="cacheProducers" value="${send.cache.producers.enabled}" />
<property name="cacheConsumers" value="${send.cache.consumers.enabled}" />
</bean>
<tx:advice id="txAdviceNew" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="send" propagation="REQUIRES_NEW" />
</tx:attributes>
</tx:advice>
<aop:config>
<aop:advisor advice-ref="txAdviceNew" pointcut="bean(inputChannel)" />
<aop:advisor advice-ref="txAdviceNew" pointcut="bean(errorChannel)" />
</aop:config>
<jms:message-driven-channel-adapter
id="mdchanneladapter" channel="inputChannel" task-executor="myTaskExecutor"
connection-factory="receiveConnectionFactory" destination="inputQueue"
error-channel="errorChannel" concurrent-consumers="${num.consumers}"
max-concurrent-consumers="${max.num.consumers}" max-messages-per-task="${max.messagesPerTask}"
transaction-manager="transactionManager" />
<jms:outbound-channel-adapter
connection-factory="deliverConnectionFactory" session-transacted="true"
destination-expression="headers.get('Deliver')" explicit-qos-enabled="true" />
</beans>
当任何一个目的地出现MQ异常时,发生部分提交,然后发生故障队列提交。我正在查看我是否缺少一些配置来加入事务,以便永远不会发生部分提交。
我尝试只使用一个连接工厂来发送和接收 (receiveConnectionFactory
),但部分提交没有发生,一切都按预期工作。
I tried with only one connection factory for both send and receive (receiveConnectionFactory
) and the parital commit is not happening, everything works as expected.
对于你的情况,这是正确的做法。
我看到你的两个 ConnectionFactories
只是他们的对象不同。其余的一切看起来都像同一个目标 MQ 服务器。
如果您绝对不能只使用一个 ConnectionFactory
,您应该考虑使用 JtaTransactionManager
或为两个 JmsTransactionManagers
配置 org.springframework.data.transaction.ChainedTransactionManager
- 每个连接工厂一个.
请参阅 Dave Syer 关于此事的文章:https://www.javaworld.com/article/2077963/open-source-tools/distributed-transactions-in-spring--with-and-without-xa.html
我使用不同的连接工厂来发送和接收消息,在传送失败的情况下遇到部分提交问题。 jms:message-driven-channel-adapter
使用 receiveConnectionFactory
ro 从队列中接收消息。 jms:outbound-channel-adapter
使用 deliverConnectionFactory
将多个消息发送到下游队列。我们只有一个 JmsTransactionManager
使用 receiveConnectionFactory
和 jms:outbound-channel-adapter
配置 session-transacted="true"
.
<beans>
<bean id="transactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="receiveConnectionFactory" />
</bean>
<bean id="receiveConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName" value="${mq.host}" />
<property name="channel" value="${mq.channel}" />
<property name="port" value="${mq.port}" />
</bean>
</property>
<property name="sessionCacheSize" value="${receive.factory.cachesize}" />
<property name="cacheProducers" value="${receive.cache.producers.enabled}" />
<property name="cacheConsumers" value="${receive.cache.consumers.enabled}" />
</bean>
<bean id="deliverConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName" value="${mq.host}" />
<property name="channel" value="${mq.channel}" />
<property name="port" value="${mq.port}" />
</bean>
</property>
<property name="sessionCacheSize" value="${send.factory.cachesize}" />
<property name="cacheProducers" value="${send.cache.producers.enabled}" />
<property name="cacheConsumers" value="${send.cache.consumers.enabled}" />
</bean>
<tx:advice id="txAdviceNew" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="send" propagation="REQUIRES_NEW" />
</tx:attributes>
</tx:advice>
<aop:config>
<aop:advisor advice-ref="txAdviceNew" pointcut="bean(inputChannel)" />
<aop:advisor advice-ref="txAdviceNew" pointcut="bean(errorChannel)" />
</aop:config>
<jms:message-driven-channel-adapter
id="mdchanneladapter" channel="inputChannel" task-executor="myTaskExecutor"
connection-factory="receiveConnectionFactory" destination="inputQueue"
error-channel="errorChannel" concurrent-consumers="${num.consumers}"
max-concurrent-consumers="${max.num.consumers}" max-messages-per-task="${max.messagesPerTask}"
transaction-manager="transactionManager" />
<jms:outbound-channel-adapter
connection-factory="deliverConnectionFactory" session-transacted="true"
destination-expression="headers.get('Deliver')" explicit-qos-enabled="true" />
</beans>
当任何一个目的地出现MQ异常时,发生部分提交,然后发生故障队列提交。我正在查看我是否缺少一些配置来加入事务,以便永远不会发生部分提交。
我尝试只使用一个连接工厂来发送和接收 (receiveConnectionFactory
),但部分提交没有发生,一切都按预期工作。
I tried with only one connection factory for both send and receive (
receiveConnectionFactory
) and the parital commit is not happening, everything works as expected.
对于你的情况,这是正确的做法。
我看到你的两个 ConnectionFactories
只是他们的对象不同。其余的一切看起来都像同一个目标 MQ 服务器。
如果您绝对不能只使用一个 ConnectionFactory
,您应该考虑使用 JtaTransactionManager
或为两个 JmsTransactionManagers
配置 org.springframework.data.transaction.ChainedTransactionManager
- 每个连接工厂一个.
请参阅 Dave Syer 关于此事的文章:https://www.javaworld.com/article/2077963/open-source-tools/distributed-transactions-in-spring--with-and-without-xa.html