发件箱模式 - 我们如何防止消息中继进程生成重复的消息?

Outbox Pattern - How can we prevent the Message Relay process from generating duplicated messages?

通常实现 outbox pattern 的方法是将消息负载存储在发件箱 table 中并有一个单独的进程(消息中继)查询未决消息并将它们发布到消息代理中,在我的例子中是 Kafka。

发件箱的状态table可能如下所示。

 OUTBOX TABLE
 ---------------------------------
|ID | STATE     | TOPIC | PAYLOAD |
 ---------------------------------
| 1 | PROCESSED | user            |
| 2 | PENDING   | user            |
| 3 | PENDING   | billing         |
----------------------------------

My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它会定期 (@Scheduled) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。

第一个问题是:如果我启动消息中继的多个实例,所有实例都会查询发件箱table,并且可能在某些时候不同的实例会获取相同的 PENDING 注册表以发布到 Kafka,从而生成重复的消息。我该如何防止这种情况?

另一种情况:假设只有一个Message Relay。它获得一个 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。

为了防止第一个问题,您必须使用数据库锁定。

SELECT * FROM outbox WHERE id = 1 FOR UPDATE

这将防止其他进程访问同一行。

第二个问题你解决不了因为你没有Kafka的分布式事务

因此,一种方法是在将记录发送到 Kafka 之前将记录设置为 PROCESSING 状态,如果应用程序崩溃,您应该检查是否有处于 PROCESSING 状态的记录并执行一些清理任务以查明它们是否已经发送到 Kafka。

但最好的解决方案是拥有一个可以处理重复项的幂等系统。

您可以使用 debezium (https://debezium.io/) 来读取 SQL 服务器的 bin-log 并将事件写入 Kafka。它将解决您的两个问题。

对于第一个问题,您可以使用ShedLock library。它确保在任何时候,只有一个服务实例正在获取计划任务。

对于第二个问题,是的,你必须开发幂等消费者。您可以通过将消息 ID 传递给消费者来做到这一点,并维护一个 table 以检查具有消息 ID 的消息是否已被处理,只需忽略它即可。