如何使用 lib amqp.node 中的方法 consume 获取所有消息?

How to get all messages using method consume in lib amqp.node?

大家好。
你能帮我解决 node.js 中的异步问题吗?

这个问题

我正在使用 amqplib module for work with RabbitMQ 并且这里有方法 consume,它提供来自 RabbitMQ 的消息,但是那个方法 first return承诺他开始,在这个承诺开始后,他调用回调从 RabbitMQ 获取数据,我不知道如何捕捉所有消息何时发送到我的节点js 应用程序。

更多解释,这里是我的代码,在评论的最后代码我写了我想要的:

/**
 * Here my test code
 *
 * requirng amqp.node lib
 */
let amqp = require('amqplib')
  , configConnection = { /* ..config options */ }
  , queue = 'users'
  , exchange = 'users.exchange'
  , type = 'fanout'

/**
 * declare annonymous function as async and immediately called it
 */
(async () => {
  /**
   * declare connection and channel with using async/await construction
   * who support version node.js >= 8.5.0
   */
  let conn = await amqp.connect(configConnection)
  let channel = await conn.createChannel()
  await channel.assertExchange(exchange, type)
  let response = await channel.assertQueue(queue)
  /**
   * response: { queue: 'users', messageCount: 10, consumerCount: 0 }
   */
  response = await channel.bindQueue(response.queue, exchange, '')
  response = await channel.consume(response.queue, logMessage, {noAck: false})
  /**
   * {noAck: false} false for not expect an acknowledgement
   */
  console.log('reading for query finish')

  function logMessage(msg) {
    console.log("[*] recieved: '%s'", msg.content.toString())
  }
})()
  /**
   * output will show:
   * reading for query finish
   * [*] recieved: 'message content'
   * [*] recieved: 'message content'
   * [*] recieved: 'message content'
   * ...
   *
   * But i'm need show message 'reading for query finish' after when
   * all consumes will executed
   *
   * Ask: How i can do this?
   */

我找到了问题的答案

正在使用答案:EventEmitter && Promise

魔术(对我来说)就在这里:
await new Promise(resolve => eventEmitter.once('consumeDone', resolve))

所以结束代码是:

/**
 * Here my test code
 *
 * requirng amqp.node lib
 */
let amqp = require('amqplib')
  , EventEmitter = require('events')
  , eventEmitter = new EventEmitter()
  , timeout = 10000
  , configConnection = { /* ..config options */ }
  , queue = 'users'
  , exchange = 'users.exchange'
  , type = 'fanout'

/**
 * declare annonymous function as async and immediately called it
 */
(async () => {
  /**
   * declare connection and channel with using async/await construction
   * who support version node.js >= 8.5.0
   */
  let conn = await amqp.connect(configConnection)
  let channel = await conn.createChannel()
  await channel.assertExchange(exchange, type)
  let response = await channel.assertQueue(queue)
  /**
   * response: { queue: 'users', messageCount: 10, consumerCount: 0 }
   */
  let messageCount = response.messageCount
  response = await channel.bindQueue(response.queue, exchange, '')
  response = await channel.consume(response.queue, logMessage(messageCount), {noAck: false})
  /**
   * {noAck: false} false for not expect an acknowledgement
   */

  /**
   * declare timeout if we have problems with emit event in consume
   * we waiting when event will be emit once 'consumeDone' and promise gain resolve
   * so we can go to the next step
   */
  setTimeout(() => eventEmitter.emit('consumeDone'), timeout)
  await new Promise(resolve => eventEmitter.once('consumeDone', resolve))
  console.log('reading for query finish')

  function logMessage(messageCount) {
    return msg => {
      console.log("[*] recieved: '%s'", msg.content.toString())
      if (messageCount == msg.fields.deliveryTag) {
        eventEmitter.emit('consumeDone')
      }
    }

  }
})()

试试下面的代码:

let amqp = require('amqplib/callback_api');

function init() {

    let configConnection = {
        protocol: 'amqp',
        hostname: 'localhost',
        port: 5672,
        username: 'root',
        password: '1111',
        heartbeat: 60,
    };
    let messageCount = 0;
    let messageCounter = 0;
    let queue_name = 'queue_name';
    let messages = [];


    amqp.connect(configConnection, function (error, connect) {

        if (error) {
            throw error;
        }

        // Create channel and get info about queue
        connect.createChannel(function (error1, channel) {

            if (error1) {
                throw error1;
            }

            channel.assertQueue(queue_name, {durable: true}, (error2, result) => {
                // here you get count of messages
                messageCount = result.messageCount;

                // Consume to queue
                channel.consume(queue_name, function (msg) {
                    msg = msg.content.toString();
                    messages.push(msg);

                    // Show all messages and exit
                    if (messageCount === ++messageCounter) {
                        console.log(messages);
                        process.exit();
                    }

                }, {
                    noAck: true
                });
            });
        });
    });
}

init();