批量处理 AWS Lambda 消息

Processing AWS Lambda messages in Batches

我想知道一些事情,但我真的找不到相关信息。也许这不是要走的路,但我只是想知道。

这是关于 Lambda 批量工作的。我知道我可以设置 Lambda 来使用批处理消息。在我的 Lambda 函数中,我迭代每条消息,如果失败,Lambda 将退出。循环又开始了。

我想知道稍微不同的方法 假设我有三个消息:ABC。我也分批服用。现在,如果消息 B 失败(例如 API 调用失败),我 return 消息 B 到 SQS 并继续处理消息 C.

可能吗?如果是,这是一个好方法吗?因为我发现我需要在 Lambda 中实现一些额外的复杂性,而不是什么。

谢谢

与所有架构决策一样,这取决于您的目标以及您愿意为更复杂的交易付出什么。使用 SQS 将允许您乱序处理消息,这样重试就不会阻塞其他消息。这是否值得复杂化取决于您担心消息被阻止的原因。

我建议阅读有关 Lambda retry behavior 和死信队列的内容。

如果您只想重试一批消息中失败的消息,这是完全可行的,但会稍微增加一些复杂性。

实现此目的的一种可能方法是遍历事件列表(例如 [eventA、eventB、eventC]),并且对于每次执行,如果事件失败,则附加到失败事件列表。然后,有一个最终案例,检查失败事件列表中是否有任何内容,如果有,手动将消息发送回 SQS(使用 SQS sendMessageBatch ).

但是,您应该注意,这会将事件放到队列的末尾,因为您是手动将它们插回。

任何事情都可以是 "good approach",只要它能解决您遇到的问题而不复杂,在这种情况下,必须重新执行成功事件的问题绝对是您可以解决的问题这种方式。

有一篇很棒的文章 here。与您相关的部分是...

  • 使用 batchSize 1,以便消息自行成功或失败。
  • 确保您的处理是幂等的,因此除了额外的处理成本之外,重新处理消息是无害的。
  • 处理函数代码中的错误,可能是通过捕获它们并将消息发送到死信队列以供进一步处理。
  • 成功处理消息后,在您的函数中手动调用 DeleteMessage API。

最后一个要点是我如何设法处理同样的问题。不要立即 returning 错误,而是存储它们或注意发生错误,然后继续处理批处理中的其余消息。在处理结束时,return 或引发错误,以便 SQS -> lambda 触发器知道不要删除失败的消息。所有成功的消息都已被您的 lambda 处理程序删除。

sqs = boto3.client('sqs')

def handler(event, context):
    failed = False

    for msg in event['Records']:
        try:
            # Do something with the message.
            handle_message(msg)
        except Exception:
            # Ok it failed, but allow the loop to finish.
            logger.exception('Failed to handle message')
            failed = True
        else:
            # The message was handled successfully. We can delete it now.
            sqs.delete_message(
                QueueUrl=<queue_url>,
                ReceiptHandle=msg['receiptHandle'],
            )

    # It doesn't matter what the error is. You just want to raise here
    # to ensure the trigger doesn't delete any of the failed messages.
    if failed:
        raise RuntimeError('Failed to process one or more messages')

def handle_msg(msg):
    ...

对于 Node.js,查看 https://www.npmjs.com/package/@middy/sqs-partial-batch-failure

const middy = require('@middy/core')
const sqsBatch = require('@middy/sqs-partial-batch-failure')

const originalHandler = (event, context, cb) => {
  const recordPromises = event.Records.map(async (record, index) => { /* Custom message processing logic */ })
  return Promise.allSettled(recordPromises)
}

const handler = middy(originalHandler)
  .use(sqsBatch())

查看 https://medium.com/@brettandrews/handling-sqs-partial-batch-failures-in-aws-lambda-d9d6940a17aa 了解更多详情。

截至 2019 年 11 月,AWS has introduced Bisect On Function Error 的概念,以及最大重试次数。如果您的函数是幂等的,则可以使用它。

在这种方法中,即使批处理中的一项失败,您也应该从函数中抛出错误。 AWS 将批次分成两部分并重试。现在一半的批次应该成功通过。对于另一半,继续该过程,直到隔离坏记录。