如何在多个 spring 集成渠道之间传播单个事务

how to propagate single transaction between multiple spring integration channels

在我的应用程序中,我使用了多个 spring 集成渠道。我的要求是所有通道中的数据库操作应该作为单个 transaction.If 所有通道中的数据库操作都成功,然后只有事务应该提交,否则所有数据库操作都应该回滚。 这是我的代码片段:

MyFirstService.java

    @Transactional
    public class MyFirstService {

        @Resource
        private MyFirstRepository myFirstRepository;

        @Transactional(propagation = Propagation.REQUIRED)
        @ServiceActivator(inputChannel = "my-first-servie-input-channel",
                          outputChannel = "my-first-servie-output-channel")
        public String saveData(final MyEntity myEntity) {
             myFirstRepository.save(myEntity);
        }
    }

MySecondService.java

 @Transactional
 public class MySecondService {

    @Resource
    private MySecondRepository mySecondRepository;

    @Transactional(propagation = Propagation.REQUIRED)
    @ServiceActivator(inputChannel = "my-first-servie-output-channel",
                      outputChannel = "my-Second-servie-output-channel")
    public String saveEntity(final MyTestEntity myTestEntity) {
         mySecondRepository.save(myTestEntity);
    }
}

spring-整合-context.xml

    <int:chain input-channel="transaction-inbound-channel" output-channel="my-first-servie-input-channel">
    </int:chain>
<int-amqp:inbound-channel-adapter
            channel="transaction-inbound-channel"
            queue-names="sample.queue"
            concurrent-consumers="5"
            error-channel="failed-channel"
            connection-factory="rabbitConnectionFactory"
            mapped-request-headers="*" channel-transacted="true"/>

    <rabbit:connection-factory
            id="rabbitConnectionFactory"
            connection-factory="rcf"
            host="${spring.rabbitmq.host}"
            port="${spring.rabbitmq.port}"
            username="${spring.rabbitmq.username}"
            password="${spring.rabbitmq.password}"
            />
    <bean id="rcf" class="com.rabbitmq.client.ConnectionFactory">
        <property name="host" value="${spring.rabbitmq.host}"/>
        <property name="requestedHeartbeat" value="10" />
    </bean>

所以在上面的代码中,如果mySecondRepository.save(myTestEntity);失败,myFirstRepository.save(myEntity);应该回滚。

我没有在我的应用程序中的任何地方使用 BarrierMessageHandler。 我将 @EnableTransactionManagement 添加到我的配置 class。

即使我尝试了 channel-transacted="true",仍然没有成功。我在这里遗漏了什么吗?

我找到了在同一服务的多个方法之间传播单个事务的解决方案 https://softwareengineering.stackexchange.com/questions/163569/how-to-manage-2-dao-methods-in-a-single-transaction 但不是在多个渠道之间。有人可以帮助我实现这一目标。 提前致谢。

不确定您的担忧。无论如何,消息通道操作都是 Java 方法。因此,相同的 TX 包装在这里也适用。

唯一需要注意的是,TX 是线程绑定的,如果您将消息转移到不同的线程,ExecutorChannelQueueChannelPublishSubscribeChannel 带有 Executor 等,TX 将被提交,因为您离开了持有线程。

因此,如果您的 my-first-servie-output-channelmy-first-servie-output-channelDirectChannel,则一切正常,您的 TX 也将包装第二个存储库调用。只是因为一切都在同一个线程中完成。

当然,BarrierMessageHandler 有一个技巧可以暂停一个线程,例如使用事务并等待触发器释放它,然后提交 TX。

编辑

对于从 MessageDriven Consumer 到直接下游流的传播事务,您应该在那里提供事务。

channel-transacted 是 RabbitMQ 通道的特定且真正本地的东西。它与您的存储库完全无关。

为此,<int-amqp:inbound-channel-adapter> 支持:

<xsd:attribute name="transaction-manager" type="xsd:string">
        <xsd:annotation>
            <xsd:appinfo>
                <xsd:documentation>
The PlatformTransactionManager to use when the Consumer receives the AMQP Message and the Listener is invoked.
                </xsd:documentation>
                <tool:annotation kind="ref">
                    <tool:expected-type type="org.springframework.transaction.PlatformTransactionManager"/>
                </tool:annotation>
            </xsd:appinfo>
        </xsd:annotation>
    </xsd:attribute>

PlatformTransactionManager 可以是足以满足您的下游服务的任何实现。