用于删除重复消息的 RabbitMQ 插件

RabbitMQ plugin to remove duplicate messages

我有一个用于文档生成的 RabbitMQ queues。基本上,每个文档都有 typestate(新的、处理中的、准备好的),所以我使用主题交换和路由键,如 type.state。每次文档更改时,我都会将带有最后文档描述的消息发送到交易所,它运行良好。

但是有时文档会被处理两次:

  1. 用户发送新文档。所以新消息 report.new 被发送到交换。
  2. 虽然工作人员尚未开始文档处理(queue 尚未达到),但用户更新了文档。同一文档的新消息 report.new 已发送。
  3. 所以现在工作人员收到第一条消息并开始他的工作,而文档已更改,所以这项工作完全没有意义。

现在我只是将小代码添加到 worker 中,将消息中的 last_modified 文档密钥与数据库中的文档密钥进行比较,如果它们不相同,则确认消息。但是我认为这不是最好的解决方案。

我的想法是将 ID 添加到消息 headers 中,并使用一些 RabbitMQ 插件从 queue 中删除具有相同 ID 的旧消息。

谢谢。

P.S。也许另一个 MQ 引擎在这里有用?例如。也许ActiveMQ有这样的功能?

好的,我已经阅读了有关 RabbitMQ 内部架构的内容,发现这是不可能的。所以寻找它的人的方式。

  1. 仅发送邮件正文中的文档 ID
  2. 为工作人员创建一个键值存储(为此我使用 memcached)。键是 ID 值是此 ID.
  3. 的最后一个工作人员 运行 的时间戳
  4. 当工作人员收到消息时,它会检查消息时间戳是否大于键值存储中的时间戳。如果是,则更新存储中的时间戳和 运行 任务,否则跳过它。

您可以查看我写的这个 plugin,它允许 de-duplicate 在代理中发布消息。

您可以根据需要在交易所de-duplicate或在queue。您的发布者唯一需要做的就是将 x-deduplicate-message 消息 header 设置为您消息的 ID

如您所写,ActiveMQ has "duplicate message detection", but it works differently. It does not remove old message from the queue but it does not add new message to it instead. So it works the same as