AMQP 按顺序将消息路由到多个 queues

AMQP routing messages to multiple queues in order

小背景

我的问题是

AMQP(特别是Rabbit-mq)中有什么可以帮助我按顺序处理这些消息吗? (pre-process,然后 process,然后 post-process)。

我目前知道的解决方法

  1. 在服务本身中处理路由逻辑,因此 pre-process 服务必须知道下一步是 process。这些服务将处理将消息发布到下一个交换器或 queue.

    我唯一的问题是,我不一定希望 pre-process 服务知道,或者关心消息接下来必须去哪里。如果我需要添加另一个服务 in-between pre-processprocess,我将不得不更改 pre-process 中的应用程序代码或配置,然后还要确保新服务也知道下一步是process.

  2. 使用某种类型的服务总线。

    我对服务总线了解不多,但我认为这就是它用来处理的东西。
    我对服务总线的唯一问题是我看过的所有实现(NServiceBus、MassTransit)看起来都非常重量级。他们有自己的新术语集,有很多功能,如果出现问题,我们现在需要成为该特定总线技术的专家,他们似乎给流程增加了大量 un-needed 复杂性。

  3. 创建我自己的路由器服务。

    每条消息都会在其 headers 中包含有关 queue 它被击中的信息。然后路由器将负责将消息发送到正确的服务。每个服务在完成工作后总是会将其消息发布回路由器。

    这样做有什么味道吗?我看到的唯一问题是,似乎我们基本上从我们的 queue 系统中夺走了很多控制权,该系统具有非常强大的路由功能。

任何关于此事的想法,或者战壕中的一些例子都会很棒。

我觉得,你可以玩话题交流(https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html)。 您可以将处理历史编码为路由键。

第一个处理器可以使用私有队列绑定到已知的 exchange 并使用路由键 "raw" 订阅消息(例如)并使用路由键 [=] 发布新消息32=] 在同一个 交易所 .

第二个处理器可以在同一个 exchange 上使用路由键 "raw.proc1" 订阅消息。

第三个处理器可以在同一个 exchange 上查找具有路由键 "raw.proc1.proc2" 的消息,依此类推。

Rabbit 具有强大的路由功能,但它不进行服务编排。它更多的是可以依赖消息传递的服务总线的范围queue。

你可以精炼1。:

  1. 一条消息是self-sufficient并且包含所有的路由逻辑。 例如,消息可以包含 header 以及与处理链关联的所有路由逻辑。例如 属性 routings:

    "routings": ["pre-process" , "process", "post-process"]

    所以一个流程步骤不需要知道下一个流程步骤。它弹出 routings 数组的第一个条目并将下一条消息发送到此 queue。如果处理步骤是线性的,则非常适合,不需要条件步骤或历史化。

    所以每个服务都必须包含路由逻辑。

第三种解决方案更易于管理(服务之间的关注点分离)。一项服务负责路由,它通过 RabbitMQ 调用适当的流程步骤。味道可能是它需要比第一个解决方案更多的消息。此缺点的成本取决于您的要求。事实上,要改进这一点,您将倾向于服务总线,它将解决方案 1) 和 3) 混合在一起。

我在工作中使用了第三种解决方案。处理步骤由状态机定义。