使用 Akka 持久邮箱进行事务性消息处理

Transactional message processing with an Akka durable mailbox

我有一个 Java/Akka 应用程序,它集成了持久性 JMS (ActiveMQ) 队列。

PersistentQueue 包装了一个包含批处理作业的 JMS/ActiveMQ 队列。在事务中接收消息,因此如果服务器在作业执行过程中宕机,则作业将在重新启动时保留。如果作业成功完成或被用户取消,则提交此事务以永久删除消息,如果作业失败,则事务回滚(将消息放在队列的前面)如果作业执行较少比 MAX_RETRY 倍。

BatchManager 是与 REST 控制器的接口。由于作业执行期间调用的存储过程的限制,它一次只能执行一个批处理作业。 BatchManager 从控制器接收作业并将它们发送到 PersistentQueue 以放入 JMS 队列,然后在作业入队时轮询 PersistentQueue 以获取新作业(除非另一个作业正在执行)或作业完成时。

我想删除 JMS 队列和处理其 JMSExceptions 的所有复杂问题,并将其替换为 BatchManager 的持久邮箱。问题是我不知道如何使用持久邮箱复制 JMS 事务——我的理解是,如果服务器在作业执行期间出现故障,那么该消息将永远丢失(而不是放回队列中等待JMS 队列)。

有没有办法用 Akka 持久邮箱实现事务性消息处理,这样如果服务器在执行时出现故障,消息也不会丢失?

Akka documentation 说:

A durable mailbox is like any other mailbox not likely to be transactional. It's possible if the actor crashes after receiving a message, but before completing processing of it, that the message could be lost.

但还有另一种类型的邮箱 - 带有显式确认的邮箱(又名 PeekMailbox)。 Here you can find usage example. And here为实现源码。

我认为您可以通过实现自定义持久邮箱来实现您的目标,它扩展了一些现有的实现并使用 PeekMailbox 功能对其进行了扩充。

您可以使用 PersistentActor 来跟踪已发布和已完成的作业。

"distributed workers" 激活器模板包含这样一个工作单元管理角色。 (以及动态工作人员注册和集群,但即使您不感兴趣,它仍然值得一看)。

Scala version of the template and Java version of the template