Camel:如果消费者无法确认,则回滚 jdbc 事务
Camel: rollback jdbc transaction if consumer cannot acknowledge
我有一个使用 AMQ/Camel 消费消息的后端应用程序。在每个消息处理期间,使用 @Transactional
JDBC DAO 完成数据库插入。我配置了两个消息代理。
问题:如果主代理停止,则当前的交换在消费者中变得陈旧:其中一些已经进行了插入,但尚未将 ACK 发送回代理。最后,代理将执行消息回滚,然后应用程序再次获取相同的消息,导致数据库错误,因为它们已经被插入。
如何配置路由,以便在连接断开时尝试回滚?这只有在我以编程方式管理交易时才有可能吗?
首先,DAO 上的 @Transactional
没有意义。您希望在完成请求后提交给数据库。对于 Web 应用程序,您通常在控制器上有事务边界。对于 JMS 消费者,您将其放在消费者(或附近)上。这样,您的事务边界就包含了您的业务逻辑和持久化逻辑。一旦线程通过事务边界,该边界的事务管理器将提交(至少在它管理一个资源时,如数据库),并且在此之后无法回滚。
也就是说,管理具有多个数据源的事务通常是一个难题。在大多数情况下,您会发现使系统对重复消息具有弹性比将提交同步到多个数据源更容易(例如,保留消息 ID 并在开始处理新消息之前查找它)。
虽然你有一些选择:
- 当我从 JMS 数据源消费并持久化到数据库时,我
通常将数据库的事务边界设置为高
尽可能在路线中。您可以将
.transacted()
作为
第一条指令之一,或者更确切地说,您可以标记 JMS
使用 transacted=true
的消费者,然后挂钩交易
提交给数据库的管理器。您可能仍然会在提交到数据库时回滚代理,因此您需要一些逻辑来处理重复的消息。
- 如果您真的想在对代理的提交失败时回滚对数据库的提交,您唯一的选择是使用实现 XA 事务的事务管理器。这些事务管理器将实施两阶段提交(即 1. 询问所有资源是否准备好提交和 2. 提交所有资源。如果第 1 阶段失败,则回滚所有事务。)如果您决定尝试这样做,您必须在骆驼端点标记您的事务边界,因为这是您想要提交给代理和数据库的时候。
XA-transactions 将是一个比处理重复消息更复杂的解决方案,它需要更多的开发人员。您已收到警告。
我有一个使用 AMQ/Camel 消费消息的后端应用程序。在每个消息处理期间,使用 @Transactional
JDBC DAO 完成数据库插入。我配置了两个消息代理。
问题:如果主代理停止,则当前的交换在消费者中变得陈旧:其中一些已经进行了插入,但尚未将 ACK 发送回代理。最后,代理将执行消息回滚,然后应用程序再次获取相同的消息,导致数据库错误,因为它们已经被插入。
如何配置路由,以便在连接断开时尝试回滚?这只有在我以编程方式管理交易时才有可能吗?
首先,DAO 上的 @Transactional
没有意义。您希望在完成请求后提交给数据库。对于 Web 应用程序,您通常在控制器上有事务边界。对于 JMS 消费者,您将其放在消费者(或附近)上。这样,您的事务边界就包含了您的业务逻辑和持久化逻辑。一旦线程通过事务边界,该边界的事务管理器将提交(至少在它管理一个资源时,如数据库),并且在此之后无法回滚。
也就是说,管理具有多个数据源的事务通常是一个难题。在大多数情况下,您会发现使系统对重复消息具有弹性比将提交同步到多个数据源更容易(例如,保留消息 ID 并在开始处理新消息之前查找它)。
虽然你有一些选择:
- 当我从 JMS 数据源消费并持久化到数据库时,我
通常将数据库的事务边界设置为高
尽可能在路线中。您可以将
.transacted()
作为 第一条指令之一,或者更确切地说,您可以标记 JMS 使用transacted=true
的消费者,然后挂钩交易 提交给数据库的管理器。您可能仍然会在提交到数据库时回滚代理,因此您需要一些逻辑来处理重复的消息。 - 如果您真的想在对代理的提交失败时回滚对数据库的提交,您唯一的选择是使用实现 XA 事务的事务管理器。这些事务管理器将实施两阶段提交(即 1. 询问所有资源是否准备好提交和 2. 提交所有资源。如果第 1 阶段失败,则回滚所有事务。)如果您决定尝试这样做,您必须在骆驼端点标记您的事务边界,因为这是您想要提交给代理和数据库的时候。
XA-transactions 将是一个比处理重复消息更复杂的解决方案,它需要更多的开发人员。您已收到警告。