没有收到使用 amqplib 发送到 RabbitMQ 并由 NestJS 处理的回复队列中的消息
Not receiving messages in the reply queue that sent to RabbitMQ using amqplib and processed by NestJS
所以我使用 NestJS (v8) 和 RabbitMQ 传输 (Transport.RMQ
) 来监听消息
我的 NestJS 代码看起来像这样:
// main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'my-queue',
replyQueue: 'my-reply-queue'
},
});
// my.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class MyController {
@MessagePattern('something')
do(data: {source: string}): {source: string} {
console.log(data);
data.source += ' | MyController';
return data;
}
}
并且在 Node.JS 应用程序中,我使用 amqplib
发送到 NestJS 应用程序并接收响应
这是 Node.JS 应用程序的代码:
const queueName = 'my-queue';
const replyQueueName = 'my-reply-queue';
const amqplib = require('amqplib');
async function run() {
const conn = await amqplib.connect('amqp://localhost:5672');
const channel = await conn.createChannel();
await channel.assertQueue(queueName);
await channel.assertQueue(replyQueueName);
// Consumer: Listen to messages from the reply queue
await channel.consume(replyQueueName, (msg) => console.log(msg.content.toString()));
// Publisher: Send message to the queue
channel.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
pattern: 'something',
data: { source: 'node-application' },
})
),
{ replyTo: replyQueueName }
);
}
run()
当我 运行 节点和 Nest.JS 应用程序时,Nest.JS 从 Node.JS 发布者获取消息,但 Node.JS 消费者永远不会打电话回复
解决方法是在 Node.JS 应用程序发送的数据中添加一个 id
键:
// ...
// Publisher: Send message to the queue
channel.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
// Add the `id` key here so the Node.js consumer will get the message in the reply queue
id: '',
pattern: 'something',
data: { source: 'node-application' },
})
),
{ replyTo: replyQueueName }
);
// ...
详细解释(在Nest.JS源代码中)
这是因为在 server-rmq.ts
文件的 handleMessage
function 中检查消息的 id
属性 是否为 undefined
// https://github.com/nestjs/nest/blob/026c1bd61c561a3ad24da425d6bca27d47567bfd/packages/microservices/server/server-rmq.ts#L139-L141
public async handleMessage(
message: Record<string, any>,
channel: any,
): Promise<void> {
// ...
if (isUndefined((packet as IncomingRequest).id)) {
return this.handleEvent(pattern, packet, rmqContext);
}
// ...
}
并且handleEvent
function中没有发送消息到回复队列的逻辑,只是处理事件
所以我使用 NestJS (v8) 和 RabbitMQ 传输 (Transport.RMQ
) 来监听消息
我的 NestJS 代码看起来像这样:
// main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'my-queue',
replyQueue: 'my-reply-queue'
},
});
// my.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class MyController {
@MessagePattern('something')
do(data: {source: string}): {source: string} {
console.log(data);
data.source += ' | MyController';
return data;
}
}
并且在 Node.JS 应用程序中,我使用 amqplib
发送到 NestJS 应用程序并接收响应
这是 Node.JS 应用程序的代码:
const queueName = 'my-queue';
const replyQueueName = 'my-reply-queue';
const amqplib = require('amqplib');
async function run() {
const conn = await amqplib.connect('amqp://localhost:5672');
const channel = await conn.createChannel();
await channel.assertQueue(queueName);
await channel.assertQueue(replyQueueName);
// Consumer: Listen to messages from the reply queue
await channel.consume(replyQueueName, (msg) => console.log(msg.content.toString()));
// Publisher: Send message to the queue
channel.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
pattern: 'something',
data: { source: 'node-application' },
})
),
{ replyTo: replyQueueName }
);
}
run()
当我 运行 节点和 Nest.JS 应用程序时,Nest.JS 从 Node.JS 发布者获取消息,但 Node.JS 消费者永远不会打电话回复
解决方法是在 Node.JS 应用程序发送的数据中添加一个 id
键:
// ...
// Publisher: Send message to the queue
channel.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
// Add the `id` key here so the Node.js consumer will get the message in the reply queue
id: '',
pattern: 'something',
data: { source: 'node-application' },
})
),
{ replyTo: replyQueueName }
);
// ...
详细解释(在Nest.JS源代码中)
这是因为在 server-rmq.ts
文件的 handleMessage
function 中检查消息的 id
属性 是否为 undefined
// https://github.com/nestjs/nest/blob/026c1bd61c561a3ad24da425d6bca27d47567bfd/packages/microservices/server/server-rmq.ts#L139-L141
public async handleMessage(
message: Record<string, any>,
channel: any,
): Promise<void> {
// ...
if (isUndefined((packet as IncomingRequest).id)) {
return this.handleEvent(pattern, packet, rmqContext);
}
// ...
}
并且handleEvent
function中没有发送消息到回复队列的逻辑,只是处理事件