RabbitMQ — 为什么错误的订阅者会收到已发布的消息?
RabbitMQ — Why does the wrong subscriber get a published message?
我有两个服务,Manager 和 Collector。
- Manager 使用 routingKey
user.collected
订阅了队列 COLLECTED_USER
并调用了 UserCollected
处理程序。
- Collector 使用 routingKey
user.collect
订阅了 Queue COLLECT_USER
并调用了 CollectUser
处理程序。
可以有多个收集器,所以我将 exclusive
设置为 false
(代码见下文)。
还有其他服务可以侦听
等事件
user.created
,
user.updated
,
user.deleted
此外还有监听更多一般事件的服务,例如
#.created
user.#
等等。
所以我正在使用 topic
交换。
设置
| exchange | type | routingKey | queueName |
| -------- | ----- | -------------- | ------------- |
| MY_APP | topic | user.collect | COLLECT_USER |
| MY_APP | topic | user.collected | COLLECTED_USER |
应该发生什么:
- Manager 使用 routingKey
user.collect
发布消息
- Collector 获取
user.collect
消息并调用 CollectUser
处理程序
- Collector 的
CollectUser
处理程序确实有效,然后使用 routingKey user.collected
发布消息
- Manager 获取
user.collected
消息并调用 UserCollected
处理程序
实际发生了什么:
- Manager 使用 routingKey
user.collect
发布消息(正确)
- Collector 获取
user.collect
消息并调用 CollectUser
处理程序(正确)
- Manager 也收到
user.collect
消息并使用错误数据调用 UserCollected
处理程序。 (错误)
- Collector 的
CollectUser
处理程序确实有效,然后使用 routingKey user.collected
(正确) 发布消息
- Manager 获取
user.collected
消息并调用 UserCollected
处理程序(正确)
我的问题
为什么 Manager 收到 user.collect
消息,给定:
- 它正在侦听
COLLECTED_USER
队列而不是 COLLECT_USER
队列,并且
- 正在侦听
COLLECT_USER
队列的 收集器 已经处理了消息。
实施细节
我按如下方式创建订阅者和发布者(针对相关性进行了修剪)
创建订阅者
给定 AMQP url
和参数 url
、exchange
、type
、routingKey
、queueName
和 handler
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)
创建发布者
给定 AMQP url
和参数 url
、exchange
和 type
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })
发布
给定 channel
和参数 exchange
、routingKey
和 message
await channel.publish(exchange, routingKey, message)
备注
我终于弄明白我的问题是什么了。肮脏的交换。在对此进行试验时,我无意中添加了一个将消息路由到错误队列的交换器,这让我感到困惑。
为了修复它,我启动了 RabbitMQ 管理 GUI 并删除了所有队列,让我的代码创建它需要的队列。上面概述的代码没有问题。
我有两个服务,Manager 和 Collector。
- Manager 使用 routingKey
user.collected
订阅了队列COLLECTED_USER
并调用了UserCollected
处理程序。 - Collector 使用 routingKey
user.collect
订阅了 QueueCOLLECT_USER
并调用了CollectUser
处理程序。
可以有多个收集器,所以我将 exclusive
设置为 false
(代码见下文)。
还有其他服务可以侦听
等事件user.created
,user.updated
,user.deleted
此外还有监听更多一般事件的服务,例如
#.created
user.#
等等。
所以我正在使用 topic
交换。
设置
| exchange | type | routingKey | queueName |
| -------- | ----- | -------------- | ------------- |
| MY_APP | topic | user.collect | COLLECT_USER |
| MY_APP | topic | user.collected | COLLECTED_USER |
应该发生什么:
- Manager 使用 routingKey
user.collect
发布消息
- Collector 获取
user.collect
消息并调用CollectUser
处理程序 - Collector 的
CollectUser
处理程序确实有效,然后使用 routingKeyuser.collected
发布消息
- Manager 获取
user.collected
消息并调用UserCollected
处理程序
实际发生了什么:
- Manager 使用 routingKey
user.collect
发布消息(正确) - Collector 获取
user.collect
消息并调用CollectUser
处理程序(正确) - Manager 也收到
user.collect
消息并使用错误数据调用UserCollected
处理程序。 (错误) - Collector 的
CollectUser
处理程序确实有效,然后使用 routingKeyuser.collected
(正确) 发布消息
- Manager 获取
user.collected
消息并调用UserCollected
处理程序(正确)
我的问题
为什么 Manager 收到 user.collect
消息,给定:
- 它正在侦听
COLLECTED_USER
队列而不是COLLECT_USER
队列,并且 - 正在侦听
COLLECT_USER
队列的 收集器 已经处理了消息。
实施细节
我按如下方式创建订阅者和发布者(针对相关性进行了修剪)
创建订阅者
给定 AMQP url
和参数 url
、exchange
、type
、routingKey
、queueName
和 handler
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)
创建发布者
给定 AMQP url
和参数 url
、exchange
和 type
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })
发布
给定 channel
和参数 exchange
、routingKey
和 message
await channel.publish(exchange, routingKey, message)
备注
我终于弄明白我的问题是什么了。肮脏的交换。在对此进行试验时,我无意中添加了一个将消息路由到错误队列的交换器,这让我感到困惑。
为了修复它,我启动了 RabbitMQ 管理 GUI 并删除了所有队列,让我的代码创建它需要的队列。上面概述的代码没有问题。