在 NodeJS 中获取来自 AWS SQS 的所有消息

Get all messages from AWS SQS in NodeJS

我有以下从 aws SQS 获取消息的函数,问题是我一次获取一条消息,我希望获取所有消息,因为我需要检查每条消息的 ID:

function getSQSMessages() {

    const params = {
        QueueUrl: 'some url',
    };

    sqs.receiveMessage(params, (err, data) => {
        if(err) {
            console.log(err, err.stack)
            return(err);
        }
        return data.Messages;
    });

};

function sendMessagesBack() {

    return new Promise((resolve, reject) => {
        if(Array.isArray(getSQSMessages())) {
            resolve(getSQSMessages());
        } else {
            reject(getSQSMessages());
        };
    });

};

函数 sendMessagesBack() 用于另一个 async/await 函数。 我不确定如何获取所有消息,因为我正在研究如何获取它们,人们提到了循环,但我不知道如何在我的案例中实现它。 我假设我必须将 sqs.receiveMessage() 放在一个循环中,但后来我对我需要检查什么以及何时停止循环感到困惑,这样我就可以得到每条消息的ID?

如果有人有任何提示,请分享。 谢谢。

我建议你使用 Promise api,它会让你立即使用 async/await 语法。

const { Messages } = await sqs.receiveMessage(params).promise();
// Messages will contain all your needed info
await sqs.sendMessage(params).promise();

这样,您就不需要用 Promises 包装回调 API。

SQS 在响应中 return 不超过 10 条消息。要获取所有可用消息,您需要递归调用 getSQSMessages 函数。 如果您 return 来自 getSQSMessages 的承诺,您可以这样做。

getSQSMessages()
.then(data => {
  if(!data.Messages || data.Messages.length === 0){
      // no messages are available. return
  }
  // continue processing for each message or push the messages into array and call 
 //getSQSMessages function again. 
});

您永远无法保证获得队列中的所有消息,除非在您获得其中一些消息后,将它们从队列中删除 - 从而确保下一个请求 return 是不同的记录选择.

每个请求会return 'upto' 10条消息,如果你不删除它们,那么下一次请求'upto' 10条消息很有可能会return 混合了您已经看到的消息和一些新消息 - 所以您永远不会真正知道什么时候看到了它们。

队列可能不是您用例中使用的正确工具 - 但由于我不知道您的用例,所以很难说。

我知道这有点死机,但昨晚我在尝试从 SQS 中的死信队列中提取一些所有消息时来到这里。虽然接受的答案“你不能保证从队列中获取所有消息”是绝对正确的,但我确实想为任何可能登陆这里并且需要绕过每个请求的 10 消息限制的人提供答案来自 AWS。

依赖关系

就我而言,我的项目中已经有一些依赖项,我曾使用它们来简化生活。

  • lodash - 这是我们在代码中用来帮助实现功能的东西。我不认为我在下面使用它,但我将它包括在内,因为它在文件中。
  • cli-progress - 这为您的 CLI 提供了一个漂亮的小进度条。

免责声明

在解决与另一个系统集成的一些生产错误时,将以下内容放在一起。我们的 DLQ 消息包含一些标识符,我需要这些标识符来制定云监视查询以进行故障排除。鉴于这是 AWS 中的两个不同 GUI,来回切换很麻烦,因为我们的 AWS 会话是通过联合形式进行的,并且会话最多只持续一个小时。

剧本

#!/usr/bin/env node

const _ = require('lodash');
const aswSdk = require('aws-sdk');
const cliProgress = require('cli-progress');

const queueUrl = 'https://[put-your-url-here]';
const queueRegion = 'us-west-1';

const getMessages = async (sqs) => {
  const resp = await sqs.receiveMessage({
    QueueUrl: queueUrl,
    MaxNumberOfMessages: 10,
  }).promise();

  return resp.Messages;
};

const main = async () => {
  const sqs = new aswSdk.SQS({ region: queueRegion });

  // First thing we need to do is get the current number of messages in the DLQ. 
  const attributes = await sqs.getQueueAttributes({
    QueueUrl: queueUrl,
    AttributeNames: ['All'], // Probably could thin this down but its late
  }).promise();

  const numberOfMessage = Number(attributes.Attributes.ApproximateNumberOfMessages);

  // Next we create a in-memory cache for the messages
  const allMessages = {};
  let running = true;

  // Honesty here: The examples we have in existing code use the multi-bar. It was about 10PM and I had 28 DLQ messages I was looking into. I didn't feel it was worth converting the multi-bar to a single-bar. Look into the docs on the github page if this is really a sticking point for you.
  const progress = new cliProgress.MultiBar({
    format: ' {bar} | {name} | {value}/{total}',
    hideCursor: true,
    clearOnComplete: true,
    stopOnComplete: true
  }, cliProgress.Presets.shades_grey);
  const progressBar = progress.create(numberOfMessage, 0, { name: 'Messages' });

  // TODO: put in a time limit to avoid an infinite loop. 
  // NOTE: For 28 messages I managed to get them all with this approach in about 15 seconds. When/if I cleanup this script I plan to add the time based short-circuit at that point.
  while (running) {
    // Fetch all the messages we can from the queue. The number of messages is not guaranteed per the AWS documentation. 
    let messages = await getMessages(sqs);
    for (let i = 0; i < messages.length; i++) {
      // Loop though the existing messages and only copy messages we have not already cached.
      let message = messages[i];
      let data = allMessages[message.MessageId];
      if (data === undefined) {
        allMessages[message.MessageId] = message;
      }
    }

    // Update our progress bar with the current progress
    const discoveredMessageCount = Object.keys(allMessages).length;
    progressBar.update(discoveredMessageCount);

    // Give a quick pause just to make sure we don't get rate limited or something
    await new Promise((resolve) => setTimeout(resolve, 1000));
    running = discoveredMessageCount !== numberOfMessage;
  }

  // Now that we have all the messages I printed them to console so I could copy/paste the output into LibreCalc (excel-like tool). I split on the semicolon for rows out of habit since sometimes similar scripts deal with data that has commas in it.
  const keys = Object.keys(allMessages);
  console.log('Message ID;ID');
  for (let i = 0; i < keys.length; i++) {
    const message = allMessages[keys[i]];
    const decodedBody = JSON.parse(message.Body);
    console.log(`${message.MessageId};${decodedBody.id}`);
  }
};

main();