如何使来自 Lambda 的批处理中的特定 SQS 消息失败?

How do I fail a specific SQS message in a batch from a Lambda?

我有一个带有 SQS 触发器的 Lambda。当它被击中时,来自 SQS 的一批记录进来(我认为通常一次大约 10 条)。如果我 return 来自处理程序的失败状态代码,将重试所有 10 条消息。如果我 return 成功代码,它们将从队列中删除。如果这 10 条消息中有 1 条失败,而我只想重试那条怎么办?

exports.handler = async (event) => {

    for(const e of event.Records){
        try {
            let body = JSON.parse(e.body);
            // do things
        }
        catch(e){
            // one message failed, i want it to be retried
        }        
    }

    // returning this causes ALL messages in 
    // this batch to be removed from the queue
    return {
        statusCode: 200,
        body: 'Finished.'
    };
};

我是否必须手动将那些消息重新添加回队列?或者我可以 return 来自我的处理程序的状态,指示一条消息失败并且应该重试吗?

是的,您必须手动将失败的消息重新添加回队列。

我建议做的是设置一个失败计数,这样如果所有消息都失败了,你可以简单地return所有消息的失败状态,否则如果失败计数小于 10 那么你可以单独发送将失败的消息返回队列。

您需要以不同的方式设计您的应用程序,这里有一些想法不是最好的,但可以解决您的问题。

方案一:

  • 创建 sqs 传递队列 - sq1
  • 根据延迟要求sq2创建延迟队列
  • 创建死信队列sdl
  • 现在在 lambda 函数中,如果消息在 sq1 中失败,则在 sq1 上删除它并将其放在 sq2 上以重试任何异步调用的 Lambda 函数都会在事件被丢弃之前重试两次。如果重试失败。

  • 如果重试后再次失败移入死信队列 sdl .

注意:当最初创建并启用 SQS 事件源映射时,或者在一段时间没有流量后首次出现时,Lambda 服务将开始使用五个并行长轮询连接轮询 SQS 队列,根据 AWS文档中,从 AWS Lambda 到 SQS 的长轮询的默认持续时间是 20 秒。

方案二:

使用 AWS StepFunction

StepFunction 将调用 lambda 并在需要时使用可配置的指数退避处理失败时的重试逻辑。

**解决方案 3:**

CloudWatch 计划事件触发轮询 FAILED 的 Lambda 函数。

给定事件源的错误处理取决于 Lambda 的调用方式。 Amazon CloudWatch Events 异步调用您的 Lambda 函数。

您必须在成功处理后以编程方式删除每条消息。

因此,如果任何消息失败,您可以将标志设置为 true,并且根据它,您可以在批量处理所有消息后引发错误,因此成功的消息将被删除,其他消息将根据重试策略。

所以按照下面的逻辑,只有失败和未处理的消息才会被重试。

import boto3

sqs = boto3.client("sqs")

def handler(event, context):
    for message in event['records']:
        queue_url = "form queue url recommended to set it as env variable"
        message_body = message["body"]
        print("do some processing :)")
        message_receipt_handle = message["receiptHandle"]
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message_receipt_handle
        )

还有一种方法是将处理成功的消息id保存到变量中,根据消息id

进行batch删除操作
response = client.delete_message_batch(
    QueueUrl='string',
    Entries=[
        {
            'Id': 'string',
            'ReceiptHandle': 'string'
        },
    ]
)

根据 AWS documentation,SQS 事件源映射现在支持开箱即用地处理部分故障。链接文章的要点如下:

  1. 在您的事件源映射配置中包含 ReportBatchItemFailures
  2. 失败时的响应语法必须修改为 {"batchItemFailures": [{"itemIdentifier": "id2"},{"itemIdentifier": "id4"}]},其中 id2 和 id4 是批处理中失败的 meesageIds
  3. 按原样引用文档:

Lambda treats a batch as a complete success if your function returns any of the following:

An empty batchItemFailures list

A null batchItemFailures list

An empty EventResponse

A null EventResponse

Lambda treats a batch as a complete failure if your function returns any of the following:

An invalid JSON response

An empty string itemIdentifier

A null itemIdentifier

An itemIdentifier with a bad key name

An itemIdentifier value with a message ID that doesn't exist

根据documentation. But one of the AWS labs example points to its usage in SAM,SAM 支持尚不可用,但在测试时对我有效

AWS 支持部分批处理响应。这是打字稿代码的示例

type Result = {
  itemIdentifier: string
  status: 'failed' | 'success'
}

const isFulfilled = <T>(
  result: PromiseFulfilledResult<T> | PromiseRejectedResult
): result is PromiseFulfilledResult<T> => result.status === 'fulfilled'

const isFailed = (
  result: PromiseFulfilledResult<Result>
): result is PromiseFulfilledResult<
  Omit<Result, 'status'> & { status: 'failed' }
> => result.value.status === 'failed'

const results = await Promise.allSettled(
 sqsEvent.Records.map(async (record) => {
   try {
     return { status: 'success', itemIdentifier: record.messageId }
   } catch(e) {
     console.error(e);
     return { status: 'failed', itemIdentifier: record.messageId }
   }
  })
)

return results
    .filter(isFulfilled)
    .filter(isFailed)
    .map((result) => ({
      itemIdentifier: result.value.itemIdentifier,
    }))