How-to 使消息流(伪)事务的任意部分?
How-to make an arbitrary part of a message flow (pseudo) transactional?
我想使由系统日志 inbound-channel-adapter 消息发起的流程具有事务性。不幸的是,适配器不采用可以进行事务处理的轮询器(许多示例中使用其他入站适配器的典型方法)。
有什么解决方法吗?
编辑
经过一番思考,我意识到我的意图与最初描述的有点不同(因此更改了标题)。基本上我想做的只是一些简单的 straight-forward 方法来制作我的消息流 pseudo-transactional 的任意部分(从流中的某个任意通道开始)。含义 - 如果流程完成而没有任何异常,我想执行一些自定义代码(但请注意,我不希望我的自定义伪提交代码成为流程本身的一部分(步骤))。如果发生任何异常,我想执行一些自定义代码。
使用 TransactionSynchronizationFactory 的语义非常适合我。
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo('/success/' + payload.name)" channel="committedChannel" />
<int:after-rollback expression="payload.renameTo('/failed/' + payload.name)" channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
唯一的问题是 how-to 将其与流程的其余部分连接在一起。我尝试的是定义中间虚拟 service-activator 端点,该端点从我希望事务开始的通道接收消息。然后向 service-activator 添加事务轮询器。但是这种方法有其自身的问题,因为为了使用轮询器,您必须将传入通道定义为 queue 通道,这似乎在单独的线程中执行流程(或者至少我观察到一些异步行为) ).
通过使适配器的通道启动事务,来自消息驱动适配器的任何流都可以 运行 在事务范围内:
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="send"/>
</tx:attributes>
</tx:advice>
<aop:config>
<aop:advisor advice-ref="txAdvice" pointcut="bean(fromSyslog)"/>
</aop:config>
<int:channel id="fromSyslog" />
通道(以及所有下游通道)必须是 DirectChannel
(无队列通道或任务执行器)。在异步切换到另一个线程后,事务将提交。
我相信你明白,但为了其他读者的利益,这不会使系统日志适配器本身成为事务性的,只是下游流。
编辑
另一种技术是使用中流事务网关...
@Transactional
public interface TxGate {
void oneWay(Message<?> message);
}
<int:service-activator input-channel="fromSyslog" ref="txGate" />
<int:gateway id="txGate" service-interface="foo.TxGate"
default-request-channel="txSyslog" error-channel="foo" />
这样你就可以在事务范围内处理异常并决定是否提交(从错误流中抛出异常会回滚)。
void return 很重要;因为下游流没有 return 回复。
EDIT2
回应您编辑过的问题。
所以,我提供的解决方案(特别是在中流交易网关中)似乎只允许您在出现错误时采取一些措施,而您还想在之后采取一些(不同的)措施成功。
有两种方法可以做到这一点。
- 使子流中的最后一个通道成为
publish-subscribe-channel
;添加第二个消费者(使用 order
明确定义调用它们的顺序)并对第二个订阅者执行 'success' 操作 - 他不会在异常后被调用(默认情况下)所以您将继续处理网关错误通道上的异常情况。
- 使用
ChannelInterceptor
。在 preSend()
开始交易。 afterSendCompletion()
方法将在子流程完成后调用。如果子流抛出异常,则填充 Exception
参数的存在(或不存在)。
我们可以考虑在框架中添加这样的拦截器,如果你想考虑contributing it。
我想使由系统日志 inbound-channel-adapter 消息发起的流程具有事务性。不幸的是,适配器不采用可以进行事务处理的轮询器(许多示例中使用其他入站适配器的典型方法)。
有什么解决方法吗?
编辑
经过一番思考,我意识到我的意图与最初描述的有点不同(因此更改了标题)。基本上我想做的只是一些简单的 straight-forward 方法来制作我的消息流 pseudo-transactional 的任意部分(从流中的某个任意通道开始)。含义 - 如果流程完成而没有任何异常,我想执行一些自定义代码(但请注意,我不希望我的自定义伪提交代码成为流程本身的一部分(步骤))。如果发生任何异常,我想执行一些自定义代码。
使用 TransactionSynchronizationFactory 的语义非常适合我。
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo('/success/' + payload.name)" channel="committedChannel" />
<int:after-rollback expression="payload.renameTo('/failed/' + payload.name)" channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
唯一的问题是 how-to 将其与流程的其余部分连接在一起。我尝试的是定义中间虚拟 service-activator 端点,该端点从我希望事务开始的通道接收消息。然后向 service-activator 添加事务轮询器。但是这种方法有其自身的问题,因为为了使用轮询器,您必须将传入通道定义为 queue 通道,这似乎在单独的线程中执行流程(或者至少我观察到一些异步行为) ).
通过使适配器的通道启动事务,来自消息驱动适配器的任何流都可以 运行 在事务范围内:
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="send"/>
</tx:attributes>
</tx:advice>
<aop:config>
<aop:advisor advice-ref="txAdvice" pointcut="bean(fromSyslog)"/>
</aop:config>
<int:channel id="fromSyslog" />
通道(以及所有下游通道)必须是 DirectChannel
(无队列通道或任务执行器)。在异步切换到另一个线程后,事务将提交。
我相信你明白,但为了其他读者的利益,这不会使系统日志适配器本身成为事务性的,只是下游流。
编辑
另一种技术是使用中流事务网关...
@Transactional
public interface TxGate {
void oneWay(Message<?> message);
}
<int:service-activator input-channel="fromSyslog" ref="txGate" />
<int:gateway id="txGate" service-interface="foo.TxGate"
default-request-channel="txSyslog" error-channel="foo" />
这样你就可以在事务范围内处理异常并决定是否提交(从错误流中抛出异常会回滚)。
void return 很重要;因为下游流没有 return 回复。
EDIT2
回应您编辑过的问题。
所以,我提供的解决方案(特别是在中流交易网关中)似乎只允许您在出现错误时采取一些措施,而您还想在之后采取一些(不同的)措施成功。
有两种方法可以做到这一点。
- 使子流中的最后一个通道成为
publish-subscribe-channel
;添加第二个消费者(使用order
明确定义调用它们的顺序)并对第二个订阅者执行 'success' 操作 - 他不会在异常后被调用(默认情况下)所以您将继续处理网关错误通道上的异常情况。 - 使用
ChannelInterceptor
。在preSend()
开始交易。afterSendCompletion()
方法将在子流程完成后调用。如果子流抛出异常,则填充Exception
参数的存在(或不存在)。
我们可以考虑在框架中添加这样的拦截器,如果你想考虑contributing it。