AMQP/RabbitMQ - 如何避免竞争条件
AMQP/RabbitMQ - How to avoid race conditions
我有以下架构:
Architecture
- 有固定数量的输入源。每个输入源都是等效的。
- AMQP 代理。在我的案例中,我使用的是 RabbitMQ。
- 目前有2个消费者。同样,每个消费者都是等价的。
输入源正在发送待处理的命令。这些命令由代理转发并由两个消费者之一接收。
我需要以下行为:
- 如果一个输入源发送多个命令,则所有命令必须按顺序处理。即在2条命令的例子中,不允许消费者1正在处理命令1,同时消费者2正在处理命令2。
- 但是,可以同时处理来自两个不同输入源的两个命令。
是否可以通过 AMQP/RabbitMQ 强制执行此行为?
您可以为每个队列使用一个使用者来覆盖您的场景。
每个队列可以按顺序处理消息。
另一种方法是只使用一个队列并使用 envelope.getExchange()
来了解来源,或者 tag
您的消息使用 AMQP.BasicProperties properties
这样,比如可以多线程消费消息,每个分配一个线程tag
为了保证顺序,您可能需要聚合消息。您可以在发布到队列之前将来自一个源的命令批处理成一条消息,因此进入队列的消息可以包含一个或多个将由消费者执行的命令。
我有以下架构: Architecture
- 有固定数量的输入源。每个输入源都是等效的。
- AMQP 代理。在我的案例中,我使用的是 RabbitMQ。
- 目前有2个消费者。同样,每个消费者都是等价的。
输入源正在发送待处理的命令。这些命令由代理转发并由两个消费者之一接收。
我需要以下行为:
- 如果一个输入源发送多个命令,则所有命令必须按顺序处理。即在2条命令的例子中,不允许消费者1正在处理命令1,同时消费者2正在处理命令2。
- 但是,可以同时处理来自两个不同输入源的两个命令。
是否可以通过 AMQP/RabbitMQ 强制执行此行为?
您可以为每个队列使用一个使用者来覆盖您的场景。 每个队列可以按顺序处理消息。
另一种方法是只使用一个队列并使用 envelope.getExchange()
来了解来源,或者 tag
您的消息使用 AMQP.BasicProperties properties
这样,比如可以多线程消费消息,每个分配一个线程tag
为了保证顺序,您可能需要聚合消息。您可以在发布到队列之前将来自一个源的命令批处理成一条消息,因此进入队列的消息可以包含一个或多个将由消费者执行的命令。