在 NodeJS 中获取来自 AWS SQS 的所有消息
Get all messages from AWS SQS in NodeJS
我有以下从 aws SQS 获取消息的函数,问题是我一次获取一条消息,我希望获取所有消息,因为我需要检查每条消息的 ID:
function getSQSMessages() {
const params = {
QueueUrl: 'some url',
};
sqs.receiveMessage(params, (err, data) => {
if(err) {
console.log(err, err.stack)
return(err);
}
return data.Messages;
});
};
function sendMessagesBack() {
return new Promise((resolve, reject) => {
if(Array.isArray(getSQSMessages())) {
resolve(getSQSMessages());
} else {
reject(getSQSMessages());
};
});
};
函数 sendMessagesBack() 用于另一个 async/await 函数。
我不确定如何获取所有消息,因为我正在研究如何获取它们,人们提到了循环,但我不知道如何在我的案例中实现它。
我假设我必须将 sqs.receiveMessage() 放在一个循环中,但后来我对我需要检查什么以及何时停止循环感到困惑,这样我就可以得到每条消息的ID?
如果有人有任何提示,请分享。
谢谢。
我建议你使用 Promise api,它会让你立即使用 async/await 语法。
const { Messages } = await sqs.receiveMessage(params).promise();
// Messages will contain all your needed info
await sqs.sendMessage(params).promise();
这样,您就不需要用 Promises 包装回调 API。
SQS 在响应中 return 不超过 10 条消息。要获取所有可用消息,您需要递归调用 getSQSMessages 函数。
如果您 return 来自 getSQSMessages 的承诺,您可以这样做。
getSQSMessages()
.then(data => {
if(!data.Messages || data.Messages.length === 0){
// no messages are available. return
}
// continue processing for each message or push the messages into array and call
//getSQSMessages function again.
});
您永远无法保证获得队列中的所有消息,除非在您获得其中一些消息后,将它们从队列中删除 - 从而确保下一个请求 return 是不同的记录选择.
每个请求会return 'upto' 10条消息,如果你不删除它们,那么下一次请求'upto' 10条消息很有可能会return 混合了您已经看到的消息和一些新消息 - 所以您永远不会真正知道什么时候看到了它们。
队列可能不是您用例中使用的正确工具 - 但由于我不知道您的用例,所以很难说。
我知道这有点死机,但昨晚我在尝试从 SQS 中的死信队列中提取一些所有消息时来到这里。虽然接受的答案“你不能保证从队列中获取所有消息”是绝对正确的,但我确实想为任何可能登陆这里并且需要绕过每个请求的 10
消息限制的人提供答案来自 AWS。
依赖关系
就我而言,我的项目中已经有一些依赖项,我曾使用它们来简化生活。
lodash
- 这是我们在代码中用来帮助实现功能的东西。我不认为我在下面使用它,但我将它包括在内,因为它在文件中。
cli-progress
- 这为您的 CLI 提供了一个漂亮的小进度条。
免责声明
在解决与另一个系统集成的一些生产错误时,将以下内容放在一起。我们的 DLQ 消息包含一些标识符,我需要这些标识符来制定云监视查询以进行故障排除。鉴于这是 AWS 中的两个不同 GUI,来回切换很麻烦,因为我们的 AWS 会话是通过联合形式进行的,并且会话最多只持续一个小时。
剧本
#!/usr/bin/env node
const _ = require('lodash');
const aswSdk = require('aws-sdk');
const cliProgress = require('cli-progress');
const queueUrl = 'https://[put-your-url-here]';
const queueRegion = 'us-west-1';
const getMessages = async (sqs) => {
const resp = await sqs.receiveMessage({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
}).promise();
return resp.Messages;
};
const main = async () => {
const sqs = new aswSdk.SQS({ region: queueRegion });
// First thing we need to do is get the current number of messages in the DLQ.
const attributes = await sqs.getQueueAttributes({
QueueUrl: queueUrl,
AttributeNames: ['All'], // Probably could thin this down but its late
}).promise();
const numberOfMessage = Number(attributes.Attributes.ApproximateNumberOfMessages);
// Next we create a in-memory cache for the messages
const allMessages = {};
let running = true;
// Honesty here: The examples we have in existing code use the multi-bar. It was about 10PM and I had 28 DLQ messages I was looking into. I didn't feel it was worth converting the multi-bar to a single-bar. Look into the docs on the github page if this is really a sticking point for you.
const progress = new cliProgress.MultiBar({
format: ' {bar} | {name} | {value}/{total}',
hideCursor: true,
clearOnComplete: true,
stopOnComplete: true
}, cliProgress.Presets.shades_grey);
const progressBar = progress.create(numberOfMessage, 0, { name: 'Messages' });
// TODO: put in a time limit to avoid an infinite loop.
// NOTE: For 28 messages I managed to get them all with this approach in about 15 seconds. When/if I cleanup this script I plan to add the time based short-circuit at that point.
while (running) {
// Fetch all the messages we can from the queue. The number of messages is not guaranteed per the AWS documentation.
let messages = await getMessages(sqs);
for (let i = 0; i < messages.length; i++) {
// Loop though the existing messages and only copy messages we have not already cached.
let message = messages[i];
let data = allMessages[message.MessageId];
if (data === undefined) {
allMessages[message.MessageId] = message;
}
}
// Update our progress bar with the current progress
const discoveredMessageCount = Object.keys(allMessages).length;
progressBar.update(discoveredMessageCount);
// Give a quick pause just to make sure we don't get rate limited or something
await new Promise((resolve) => setTimeout(resolve, 1000));
running = discoveredMessageCount !== numberOfMessage;
}
// Now that we have all the messages I printed them to console so I could copy/paste the output into LibreCalc (excel-like tool). I split on the semicolon for rows out of habit since sometimes similar scripts deal with data that has commas in it.
const keys = Object.keys(allMessages);
console.log('Message ID;ID');
for (let i = 0; i < keys.length; i++) {
const message = allMessages[keys[i]];
const decodedBody = JSON.parse(message.Body);
console.log(`${message.MessageId};${decodedBody.id}`);
}
};
main();
我有以下从 aws SQS 获取消息的函数,问题是我一次获取一条消息,我希望获取所有消息,因为我需要检查每条消息的 ID:
function getSQSMessages() {
const params = {
QueueUrl: 'some url',
};
sqs.receiveMessage(params, (err, data) => {
if(err) {
console.log(err, err.stack)
return(err);
}
return data.Messages;
});
};
function sendMessagesBack() {
return new Promise((resolve, reject) => {
if(Array.isArray(getSQSMessages())) {
resolve(getSQSMessages());
} else {
reject(getSQSMessages());
};
});
};
函数 sendMessagesBack() 用于另一个 async/await 函数。 我不确定如何获取所有消息,因为我正在研究如何获取它们,人们提到了循环,但我不知道如何在我的案例中实现它。 我假设我必须将 sqs.receiveMessage() 放在一个循环中,但后来我对我需要检查什么以及何时停止循环感到困惑,这样我就可以得到每条消息的ID?
如果有人有任何提示,请分享。 谢谢。
我建议你使用 Promise api,它会让你立即使用 async/await 语法。
const { Messages } = await sqs.receiveMessage(params).promise();
// Messages will contain all your needed info
await sqs.sendMessage(params).promise();
这样,您就不需要用 Promises 包装回调 API。
SQS 在响应中 return 不超过 10 条消息。要获取所有可用消息,您需要递归调用 getSQSMessages 函数。 如果您 return 来自 getSQSMessages 的承诺,您可以这样做。
getSQSMessages()
.then(data => {
if(!data.Messages || data.Messages.length === 0){
// no messages are available. return
}
// continue processing for each message or push the messages into array and call
//getSQSMessages function again.
});
您永远无法保证获得队列中的所有消息,除非在您获得其中一些消息后,将它们从队列中删除 - 从而确保下一个请求 return 是不同的记录选择.
每个请求会return 'upto' 10条消息,如果你不删除它们,那么下一次请求'upto' 10条消息很有可能会return 混合了您已经看到的消息和一些新消息 - 所以您永远不会真正知道什么时候看到了它们。
队列可能不是您用例中使用的正确工具 - 但由于我不知道您的用例,所以很难说。
我知道这有点死机,但昨晚我在尝试从 SQS 中的死信队列中提取一些所有消息时来到这里。虽然接受的答案“你不能保证从队列中获取所有消息”是绝对正确的,但我确实想为任何可能登陆这里并且需要绕过每个请求的 10
消息限制的人提供答案来自 AWS。
依赖关系
就我而言,我的项目中已经有一些依赖项,我曾使用它们来简化生活。
lodash
- 这是我们在代码中用来帮助实现功能的东西。我不认为我在下面使用它,但我将它包括在内,因为它在文件中。cli-progress
- 这为您的 CLI 提供了一个漂亮的小进度条。
免责声明
在解决与另一个系统集成的一些生产错误时,将以下内容放在一起。我们的 DLQ 消息包含一些标识符,我需要这些标识符来制定云监视查询以进行故障排除。鉴于这是 AWS 中的两个不同 GUI,来回切换很麻烦,因为我们的 AWS 会话是通过联合形式进行的,并且会话最多只持续一个小时。
剧本
#!/usr/bin/env node
const _ = require('lodash');
const aswSdk = require('aws-sdk');
const cliProgress = require('cli-progress');
const queueUrl = 'https://[put-your-url-here]';
const queueRegion = 'us-west-1';
const getMessages = async (sqs) => {
const resp = await sqs.receiveMessage({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
}).promise();
return resp.Messages;
};
const main = async () => {
const sqs = new aswSdk.SQS({ region: queueRegion });
// First thing we need to do is get the current number of messages in the DLQ.
const attributes = await sqs.getQueueAttributes({
QueueUrl: queueUrl,
AttributeNames: ['All'], // Probably could thin this down but its late
}).promise();
const numberOfMessage = Number(attributes.Attributes.ApproximateNumberOfMessages);
// Next we create a in-memory cache for the messages
const allMessages = {};
let running = true;
// Honesty here: The examples we have in existing code use the multi-bar. It was about 10PM and I had 28 DLQ messages I was looking into. I didn't feel it was worth converting the multi-bar to a single-bar. Look into the docs on the github page if this is really a sticking point for you.
const progress = new cliProgress.MultiBar({
format: ' {bar} | {name} | {value}/{total}',
hideCursor: true,
clearOnComplete: true,
stopOnComplete: true
}, cliProgress.Presets.shades_grey);
const progressBar = progress.create(numberOfMessage, 0, { name: 'Messages' });
// TODO: put in a time limit to avoid an infinite loop.
// NOTE: For 28 messages I managed to get them all with this approach in about 15 seconds. When/if I cleanup this script I plan to add the time based short-circuit at that point.
while (running) {
// Fetch all the messages we can from the queue. The number of messages is not guaranteed per the AWS documentation.
let messages = await getMessages(sqs);
for (let i = 0; i < messages.length; i++) {
// Loop though the existing messages and only copy messages we have not already cached.
let message = messages[i];
let data = allMessages[message.MessageId];
if (data === undefined) {
allMessages[message.MessageId] = message;
}
}
// Update our progress bar with the current progress
const discoveredMessageCount = Object.keys(allMessages).length;
progressBar.update(discoveredMessageCount);
// Give a quick pause just to make sure we don't get rate limited or something
await new Promise((resolve) => setTimeout(resolve, 1000));
running = discoveredMessageCount !== numberOfMessage;
}
// Now that we have all the messages I printed them to console so I could copy/paste the output into LibreCalc (excel-like tool). I split on the semicolon for rows out of habit since sometimes similar scripts deal with data that has commas in it.
const keys = Object.keys(allMessages);
console.log('Message ID;ID');
for (let i = 0; i < keys.length; i++) {
const message = allMessages[keys[i]];
const decodedBody = JSON.parse(message.Body);
console.log(`${message.MessageId};${decodedBody.id}`);
}
};
main();