AWS SQS 消息在可见性超时后不再可用

AWS SQS messages does not become available again after visibility timeout

这很可能是一件非常简单的事情,但由于某种原因,我的 SQS 消息在可见性超时后不再可用。至少这是我的想法,因为使用 lambda 没有日志条目表明已触发任何重试。

我的用例是另一个 lambda 正在为 SQS 队列提供 JSON 实体,然后需要将这些实体转发。有时要发送的数据太多,以至于接收端使用 HTTP 429 进行响应。

我发送的 lambda(JSON 正文通过 HTTPS)仅在服务使用 HTTP 200 响应时从队列中删除消息,否则我对 receiptHandle 不做任何事情,我认为应该保留队列中的消息。

无论如何,当请求被服务拒绝时,消息不再可用,因此它再也不会尝试发送并永远丢失。

传入 SQS 已设置如下:

关联的 DLQ 的最大接收数为 100

消耗的 lambda 配置为

我的 lambda 中的实际逻辑非常简单,真的。它接收的事件是队列中的 Records。 Lambda 可能一次获得多个记录,但所有记录都是单独处理的。

console.log('Response', response);

if (response.status === 'CREATED') {

  /* some code here */

  const deleteParams = {
    QueueUrl: queueUrl, /* required */
    ReceiptHandle: receiptHandle /* required */
  };

  console.log('Removing record from ', queueUrl, 'with params', deleteParams);    
  await sqs.deleteMessage(deleteParams).promise();
} else {
  /* any record that ends up here, are never seen again :( */
  console.log('Keeping record in', eventSourceARN);
}

做什么:(?!?!11

otherwise I do nothing with the receiptHandle which I think should then keep the message in the queue

那是not now it works:

Lambda polls the queue and invokes your Lambda function synchronously with an event that contains queue messages. Lambda reads messages in batches and invokes your function once for each batch. When your function successfully processes a batch, Lambda deletes its messages from the queue.

当从 Amazon SQS 队列触发 AWS Lambda 函数时,与 SQS 相关的所有活动都由 Lambda 服务处理。 您的代码不应调用任何 Amazon SQS 函数。

消息将通过 event 参数提供给 AWS Lambda 函数。当函数成功退出时,Lambda 服务将从队列中删除消息。

您的代码不应调用 DeleteMessage()

如果您希望表明 一些 邮件未成功处理,您可以使用 partial batch response 来指示哪些邮件已成功处理。然后,AWS Lambda 服务将使队列中不成功的消息再次可用。

感谢所有回答的人。所以我通过阅读文档解决了这个“问题”。

我将在此处针对我自己的问题提供更详细的答案,以防除我之外的其他人在第一时间没有得到它:)

所以函数应该 return batchItemFailures 包含失败消息 ID 的对象。

因此,例如,可以将 Lambda 处理程序设置为

/**
 * Handler
 *
 * @param {*} event SQS event
 * @returns {Object} batch item failures
 */
exports.handler = async (event) => {
  console.log('Starting ' + process.env.AWS_LAMBDA_FUNCTION_NAME);
  console.log('Received event', event);

  event = typeof event === 'object'
    ? event
    : JSON.parse(event);

  const batchItemFailures = await execute(event.Records);

  if (batchItemFailures.length > 0) {
    console.log('Failures', batchItemFailures);
  } else {
    console.log('No failures');
  }

  return {
    batchItemFailures: batchItemFailures
  }
}

并执行处理消息的函数

/**
 * Execute
 *
 * @param {Array} records SQS records
 * @returns {Promise<*[]>} batch item failures
 */
async function execute (records) {
  let batchItemFailures = [];

  for (let index = 0; index < records.length; index++) {
    const record = records[index];

    // ...some async stuff here

    if (someSuccessCondition) {
      console.log('Life is good');
    } else {
      batchItemFailures.push({
        itemIdentifier: record.messageId
      });
    }
  }

  return batchItemFailures;
}