如何使用 lambda 函数处理 SQS 队列(不是通过预定事件)?

How to process SQS queue with lambda function (not via scheduled events)?

以下是我正在努力实现的简化方案:

http requests --> (Gateway API + lambda A) --> SQS --> (lambda B ?????) --> DynamoDB

所以它应该如图所示工作:来自许多 http 请求的数据(例如每秒最多 500 个) 由我的 lambda 函数 A 放入 SQS 队列。然后另一个函数 B 处理队列: 最多读取 10 个项目(在某些周期性的基础上)并使用 BatchWriteItem 将它们写入 DynamoDB。

问题是我不知道如何触发第二个 lambda 函数。它应该被频繁调用,每秒多次(或至少每秒一次),因为我需要队列中的所有数据尽快进入 DynamoDB(这就是为什么通过计划事件调用 lambda 函数 B,如所述 here不是一个选项)


为什么我不想在没有 SQS 的情况下直接写入 DynamoDB?

如果我完全避免使用 SQS,那就太好了。我试图用 SQS 解决的问题是 DynamoDB 节流。甚至没有节流本身,而是在使用 AWS SDK 将数据写入 DynamoDB 时处理它的方式:当一条记录写入并限制它们时,AWS SDK 静默重试写入,导致从 http 客户端的请求处理时间增加查看。

所以我想暂时将数据存储在队列中,将响应“200 OK”发送回客户端,然后通过单独的函数处理队列, 使用一个 DynamoDB 的 BatchWriteItem 调用写入多条记录(其中 returns 未处理的项目而不是在节流的情况下自动重试)。 我什至宁愿丢失一些记录而不是增加接收记录和存储在 DynamoDB 中的记录之间的延迟

更新: 如果有人感兴趣,我已经找到了如何让 aws-sdk 在节流的情况下跳过自动重试:有一个特殊参数 maxRetries。无论如何,按照下面的建议使用 Kinesis

不幸的是,您无法直接集成 SQS 和 Lambda。但不要太担心。有解决办法!您需要添加另一项亚马逊服务,您的所有问题都将得到解决。

http requests --> (Gateway API + lambda A) --> SQS + SNS --> lambda B --> DynamoDB

您可以触发对第二个 lambda 服务的 SNS 通知以将其启动。一旦启动,它就可以清空队列并将所有结果写入 DynamoDB。要更好地了解 Lambda 的可能事件源,请查看 these docs.

[这并没有直接回答你的明确问题,所以根据我的经验,它会被否决:)但是,我会回答你试图解决的基本问题。]

我们接收大量传入请求并将它们提供给 AWS Lambda 函数以按节奏写入 DynamoDB 的方式是用 Amazon Kinesis 流替换建议架构中的 SQS。

Kinesis 流可以驱动 AWS Lambda 函数。

Kinesis 流保证任何给定键的传递消息的排序(对于有序的数据库操作很好)。

Kinesis 流让您可以指定有多少 AWS Lambda 函数可以 运行 并行(每个分区一个),这可以与您的 DynamoDB 写入容量相协调。

Kinesis 流可以在一次 AWS Lambda 函数调用中传递多个可用消息,从而允许进一步优化。

注意:真正是AWS Lambda服务从Amazon Kinesis streams读取然后调用函数,而不是Kinesis streams直接调用AWS Lambda;但有时更容易想象为 Kinesis 驱动它。给用户的结果几乎是一样的。

另一种解决方案是将项目添加到 SQS,使用事件调用目标 Lambda 函数,使其异步。

然后异步 Lambda 可以从 SQS 获取任意数量的项目并处理它们。

我还会添加对异步 Lambda 的预定调用,以处理队列中出错的任何项目。

[更新]您现在可以在队列中的新消息上设置 Lambda 触发器

也许更具成本效益的解决方案是将所有内容都保留在 SQS 中(按原样),然后 运行 调用多线程 Lambda 函数来处理队列中的项目的计划事件?

这样,您的队列工作人员就可以完全匹配您的限制。如果队列为空,函数可能会提前完成或在单线程中开始轮询。

对于这种情况,Kinesis 听起来有点矫枉过正——例如,您不需要原始顺序。此外,运行同时使用多个 Lambda 肯定比 运行只使用一个多线程 Lambda 更昂贵。

您的 Lambda 将全部用于 I/O,对 AWS 服务进行外部调用,因此一个函数可能非常适合。

这是我解决这个问题的方法:

HTTP request --> DynamoDb --> Stream --> Lambda Function

在此解决方案中,您必须为 table 设置流。流由您将编写的 Lambda 函数处理,仅此而已。无需使用 SQS 或其他任何东西。

当然,这是一个简化的设计,只适用于简单的问题。对于更复杂的场景,请使用 Kinesis(如其他答案中所述)。

这是一个link to AWS documentation on the topic

自 2018 年 6 月 28 日起,您现在可以使用 SQS 在本地触发 AWS Lambda 函数。不再需要解决方法!

https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/

并且在 2019 年 11 月,添加了对 FIFO 队列的支持:

https://aws.amazon.com/blogs/compute/new-for-aws-lambda-sqs-fifo-as-an-event-source/

我相信 AWS 现在已经想出了一种 SQS 可以触发 lambda 函数的方法。所以我想我们可以使用 SQS 来平滑数据到 dynamo 的突发负载,以防您不关心消息的顺序。在他们的博客上查看这个新更新:https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/

以下是我从 SQS 队列收集消息的方法:

package au.com.redbarn.aws.lambda2lambda_via_sqs;

import java.util.List;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;

import lombok.extern.log4j.Log4j2;

@Log4j2
public class SQSConsumerLambda implements RequestHandler<SQSEvent, String> {

    @Override
    public String handleRequest(SQSEvent input, Context context) {

        log.info("message received");

        List<SQSMessage> records = input.getRecords();

        for (SQSMessage record : records) {
            log.info(record.getBody());
        }

        return "Ok";
    }
}

将您的 DynamoDB 代码添加到 handleRequest(),Lambda B 就完成了。