从 Amazon Web Services SQS 轮询消息的最佳方式是什么

What is the best way to poll for messages from Amazon Web Services SQS

轮询来自 Amazon Web Services SQS 的消息的最佳方式是什么?我想在消息进入队列后立即处理消息,我真的不想每秒轮询一次,除非这是唯一的选择?

setInterval(run(), 1000); 我得到了错误

const run: () => Promise<void>
//---------------------------------
No overload matches this call.
  Overload 1 of 2, '(callback: (...args: any[]) => void, ms: number, ...args: any[]): Timeout', gave the following error.
    Argument of type 'Promise<void>' is not assignable to parameter of type '(...args: any[]) => void'.
      Type 'Promise<void>' provides no match for the signature '(...args: any[]): void'.
  Overload 2 of 2, '(handler: TimerHandler, timeout?: number, ...arguments: any[]): number', gave the following error.
    Argument of type 'Promise<void>' is not assignable to parameter of type 'TimerHandler'.
      Type 'Promise<void>' is missing the following properties from type 'Function': apply, call, bind, prototype, and 5 more.ts(2769)`

我的代码...

const QueueUrl = process.env.SQS_QUEUE_URL;
const params = {
  AttributeNames: ['SentTimestamp'],
  MaxNumberOfMessages: 10,
  MessageAttributeNames: ['All'],
  QueueUrl,
  VisibilityTimeout: 20,
  WaitTimeSeconds: 0,
};
const sqs = new SQSClient({ region: process.env.SQS_REGION });

const run = async () => {
  try {
    const data = await sqs.send(new ReceiveMessageCommand(params));
    if (data.Messages) {
      for (const val of data.Messages) {
        const address = val.MessageAttributes.Address.StringValue;
        const amount = Number(val.MessageAttributes.Amount.StringValue);
        createTransaction(address, amount);
        const deleteParams = {
          QueueUrl,
          ReceiptHandle: val.ReceiptHandle,
        };
        try {
          await sqs.send(new DeleteMessageCommand(deleteParams));
        } catch (err) {
          // tslint:disable-next-line: no-console
          console.log('Message Deleted', data);
        }
      }
    } else {
      // tslint:disable-next-line: no-console
      console.log('No messages to delete');
    }
  } catch (err) {
    // tslint:disable-next-line: no-console
    console.log('Receive Error', err);
  }
};
run();

如果您想定期检索消息(例如,每 2-3 分钟),实现此用例的一种方法是编写一个使用 SQS API 和然后调用 receiveMesage(你如何处理消息由你决定)。然后安排要使用 Cloud Watch 事件调用的 Lambda 函数。您可以使用 CRON 表达式来定义调用 Lambda 函数的频率。

调用ReceiveMessage时,可以指定WaitTimeSeconds最多20秒。这被称为 Long Polling

它是这样工作的:

  • 如果队列中有消息,将立即return调用最多 10 条消息
  • 如果队列中没有消息,但在 WaitTimeSeconds 时间段内出现了一些消息,则在消息可用时 return 将立即调用
  • 如果在整个 WaitTimeSeconds 期间队列中没有消息,则调用将 return 没有消息

这意味着您可以循环调用 ReceiveMessage,但如果没有消息,它会在 return 之前最多等待 20 秒。这比“每秒轮询”要好。

或者,您可以将代码放入 AWS Lambda 函数 并将 Amazon SQS 队列配置为该函数的 触发器 。当消息发送到队列时,将自动调用 Lambda 函数,消息通过 event 参数传递到函数(无需调用 ReceiveMessage)。