How-to 使消息流(伪)事务的任意部分?

How-to make an arbitrary part of a message flow (pseudo) transactional?

我想使由系统日志 inbound-channel-adapter 消息发起的流程具有事务性。不幸的是,适配器不采用可以进行事务处理的轮询器(许多示例中使用其他入站适配器的典型方法)。

有什么解决方法吗?

编辑

经过一番思考,我意识到我的意图与最初描述的有点不同(因此更改了标题)。基本上我想做的只是一些简单的 straight-forward 方法来制作我的消息流 pseudo-transactional 的任意部分(从流中的某个任意通道开始)。含义 - 如果流程完成而没有任何异常,我想执行一些自定义代码(但请注意,我不希望我的自定义伪提交代码成为流程本身的一部分(步骤))。如果发生任何异常,我想执行一些自定义代码。

使用 TransactionSynchronizationFactory 的语义非常适合我。

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo('/success/' + payload.name)" channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo('/failed/' + payload.name)" channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

唯一的问题是 how-to 将其与流程的其余部分连接在一起。我尝试的是定义中间虚拟 service-activator 端点,该端点从我希望事务开始的通道接收消息。然后向 service-activator 添加事务轮询器。但是这种方法有其自身的问题,因为为了使用轮询器,您必须将传入通道定义为 queue 通道,这似乎在单独的线程中执行流程(或者至少我观察到一些异步行为) ).

通过使适配器的通道启动事务,来自消息驱动适配器的任何流都可以 运行 在事务范围内:

<tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
        <tx:method name="send"/>
    </tx:attributes>
</tx:advice>

<aop:config>
    <aop:advisor advice-ref="txAdvice" pointcut="bean(fromSyslog)"/>
</aop:config>

<int:channel id="fromSyslog" />

通道(以及所有下游通道)必须是 DirectChannel(无队列通道或任务执行器)。在异步切换到另一个线程后,事务将提交。

我相信你明白,但为了其他读者的利益,这不会使系统日志适配器本身成为事务性的,只是下游流。

编辑

另一种技术是使用中流事务网关...

@Transactional
public interface TxGate {

    void oneWay(Message<?> message);

}

<int:service-activator input-channel="fromSyslog" ref="txGate" />

<int:gateway id="txGate" service-interface="foo.TxGate"
     default-request-channel="txSyslog" error-channel="foo" />

这样你就可以在事务范围内处理异常并决定是否提交(从错误流中抛出异常会回滚)。

void return 很重要;因为下游流没有 return 回复。

EDIT2

回应您编辑过的问题。

所以,我提供的解决方案(特别是在中流交易网关中)似乎只允许您在出现错误时采取一些措施,而您还想在之后采取一些(不同的)措施成功。

有两种方法可以做到这一点。

  1. 使子流中的最后一个通道成为publish-subscribe-channel;添加第二个消费者(使用 order 明确定义调用它们的顺序)并对第二个订阅者执行 'success' 操作 - 他不会在异常后被调用(默认情况下)所以您将继续处理网关错误通道上的异常情况。
  2. 使用 ChannelInterceptor。在 preSend() 开始交易。 afterSendCompletion() 方法将在子流程完成后调用。如果子流抛出异常,则填充 Exception 参数的存在(或不存在)。

我们可以考虑在框架中添加这样的拦截器,如果你想考虑contributing it