如何在多个 spring 集成渠道之间传播单个事务
how to propagate single transaction between multiple spring integration channels
在我的应用程序中,我使用了多个 spring 集成渠道。我的要求是所有通道中的数据库操作应该作为单个 transaction.If 所有通道中的数据库操作都成功,然后只有事务应该提交,否则所有数据库操作都应该回滚。
这是我的代码片段:
MyFirstService.java
@Transactional
public class MyFirstService {
@Resource
private MyFirstRepository myFirstRepository;
@Transactional(propagation = Propagation.REQUIRED)
@ServiceActivator(inputChannel = "my-first-servie-input-channel",
outputChannel = "my-first-servie-output-channel")
public String saveData(final MyEntity myEntity) {
myFirstRepository.save(myEntity);
}
}
MySecondService.java
@Transactional
public class MySecondService {
@Resource
private MySecondRepository mySecondRepository;
@Transactional(propagation = Propagation.REQUIRED)
@ServiceActivator(inputChannel = "my-first-servie-output-channel",
outputChannel = "my-Second-servie-output-channel")
public String saveEntity(final MyTestEntity myTestEntity) {
mySecondRepository.save(myTestEntity);
}
}
spring-整合-context.xml
<int:chain input-channel="transaction-inbound-channel" output-channel="my-first-servie-input-channel">
</int:chain>
<int-amqp:inbound-channel-adapter
channel="transaction-inbound-channel"
queue-names="sample.queue"
concurrent-consumers="5"
error-channel="failed-channel"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*" channel-transacted="true"/>
<rabbit:connection-factory
id="rabbitConnectionFactory"
connection-factory="rcf"
host="${spring.rabbitmq.host}"
port="${spring.rabbitmq.port}"
username="${spring.rabbitmq.username}"
password="${spring.rabbitmq.password}"
/>
<bean id="rcf" class="com.rabbitmq.client.ConnectionFactory">
<property name="host" value="${spring.rabbitmq.host}"/>
<property name="requestedHeartbeat" value="10" />
</bean>
所以在上面的代码中,如果mySecondRepository.save(myTestEntity);
失败,myFirstRepository.save(myEntity);
应该回滚。
我没有在我的应用程序中的任何地方使用 BarrierMessageHandler
。
我将 @EnableTransactionManagement
添加到我的配置 class。
即使我尝试了 channel-transacted="true"
,仍然没有成功。我在这里遗漏了什么吗?
我找到了在同一服务的多个方法之间传播单个事务的解决方案
https://softwareengineering.stackexchange.com/questions/163569/how-to-manage-2-dao-methods-in-a-single-transaction
但不是在多个渠道之间。有人可以帮助我实现这一目标。
提前致谢。
不确定您的担忧。无论如何,消息通道操作都是 Java 方法。因此,相同的 TX 包装在这里也适用。
唯一需要注意的是,TX 是线程绑定的,如果您将消息转移到不同的线程,ExecutorChannel
、QueueChannel
、PublishSubscribeChannel
带有 Executor
等,TX 将被提交,因为您离开了持有线程。
因此,如果您的 my-first-servie-output-channel
和 my-first-servie-output-channel
是 DirectChannel
,则一切正常,您的 TX 也将包装第二个存储库调用。只是因为一切都在同一个线程中完成。
当然,BarrierMessageHandler
有一个技巧可以暂停一个线程,例如使用事务并等待触发器释放它,然后提交 TX。
编辑
对于从 MessageDriven Consumer 到直接下游流的传播事务,您应该在那里提供事务。
channel-transacted
是 RabbitMQ 通道的特定且真正本地的东西。它与您的存储库完全无关。
为此,<int-amqp:inbound-channel-adapter>
支持:
<xsd:attribute name="transaction-manager" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<xsd:documentation>
The PlatformTransactionManager to use when the Consumer receives the AMQP Message and the Listener is invoked.
</xsd:documentation>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.transaction.PlatformTransactionManager"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
PlatformTransactionManager
可以是足以满足您的下游服务的任何实现。
在我的应用程序中,我使用了多个 spring 集成渠道。我的要求是所有通道中的数据库操作应该作为单个 transaction.If 所有通道中的数据库操作都成功,然后只有事务应该提交,否则所有数据库操作都应该回滚。 这是我的代码片段:
MyFirstService.java
@Transactional
public class MyFirstService {
@Resource
private MyFirstRepository myFirstRepository;
@Transactional(propagation = Propagation.REQUIRED)
@ServiceActivator(inputChannel = "my-first-servie-input-channel",
outputChannel = "my-first-servie-output-channel")
public String saveData(final MyEntity myEntity) {
myFirstRepository.save(myEntity);
}
}
MySecondService.java
@Transactional
public class MySecondService {
@Resource
private MySecondRepository mySecondRepository;
@Transactional(propagation = Propagation.REQUIRED)
@ServiceActivator(inputChannel = "my-first-servie-output-channel",
outputChannel = "my-Second-servie-output-channel")
public String saveEntity(final MyTestEntity myTestEntity) {
mySecondRepository.save(myTestEntity);
}
}
spring-整合-context.xml
<int:chain input-channel="transaction-inbound-channel" output-channel="my-first-servie-input-channel">
</int:chain>
<int-amqp:inbound-channel-adapter
channel="transaction-inbound-channel"
queue-names="sample.queue"
concurrent-consumers="5"
error-channel="failed-channel"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*" channel-transacted="true"/>
<rabbit:connection-factory
id="rabbitConnectionFactory"
connection-factory="rcf"
host="${spring.rabbitmq.host}"
port="${spring.rabbitmq.port}"
username="${spring.rabbitmq.username}"
password="${spring.rabbitmq.password}"
/>
<bean id="rcf" class="com.rabbitmq.client.ConnectionFactory">
<property name="host" value="${spring.rabbitmq.host}"/>
<property name="requestedHeartbeat" value="10" />
</bean>
所以在上面的代码中,如果mySecondRepository.save(myTestEntity);
失败,myFirstRepository.save(myEntity);
应该回滚。
我没有在我的应用程序中的任何地方使用 BarrierMessageHandler
。
我将 @EnableTransactionManagement
添加到我的配置 class。
即使我尝试了 channel-transacted="true"
,仍然没有成功。我在这里遗漏了什么吗?
我找到了在同一服务的多个方法之间传播单个事务的解决方案 https://softwareengineering.stackexchange.com/questions/163569/how-to-manage-2-dao-methods-in-a-single-transaction 但不是在多个渠道之间。有人可以帮助我实现这一目标。 提前致谢。
不确定您的担忧。无论如何,消息通道操作都是 Java 方法。因此,相同的 TX 包装在这里也适用。
唯一需要注意的是,TX 是线程绑定的,如果您将消息转移到不同的线程,ExecutorChannel
、QueueChannel
、PublishSubscribeChannel
带有 Executor
等,TX 将被提交,因为您离开了持有线程。
因此,如果您的 my-first-servie-output-channel
和 my-first-servie-output-channel
是 DirectChannel
,则一切正常,您的 TX 也将包装第二个存储库调用。只是因为一切都在同一个线程中完成。
当然,BarrierMessageHandler
有一个技巧可以暂停一个线程,例如使用事务并等待触发器释放它,然后提交 TX。
编辑
对于从 MessageDriven Consumer 到直接下游流的传播事务,您应该在那里提供事务。
channel-transacted
是 RabbitMQ 通道的特定且真正本地的东西。它与您的存储库完全无关。
为此,<int-amqp:inbound-channel-adapter>
支持:
<xsd:attribute name="transaction-manager" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<xsd:documentation>
The PlatformTransactionManager to use when the Consumer receives the AMQP Message and the Listener is invoked.
</xsd:documentation>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.transaction.PlatformTransactionManager"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
PlatformTransactionManager
可以是足以满足您的下游服务的任何实现。