DynamoDB 未收到整个 SQS 消息正文
DynamoDB not receiving the entire SQS message body
我正在从 API 中分批提取数据并将其发送到 SQS 队列。我遇到问题的地方是处理消息以便将数据发送到 DynamoDB。数据集中应该有 147,689 条记录。但是,当运行代码时,有时少于147,689条记录会放入DynamoDB,有时多于147,689条记录会放入DynamoDB,有时会有147,689条记录放入DynamoDB。它并没有始终如一地将 147,689 条记录放入数据库。
我已经尝试了所有我能想到的方法来尝试解决这个问题,包括(使用 Fifo 队列而不是标准队列,增加可见性超时,增加传递超时,使用 uuid.uuid1() 而不是 uuid .uuid4()) 我正在遍历“记录”列表,所以不确定为什么它不处理整个批次。下面是我处理消息并将数据发送到 DynamoDB 的最新代码:
import boto3
import json
import uuid
import time
dynamo = boto3.client("dynamodb", "us-east-1")
def lambda_handler(event, context):
for item in json.loads(event["Records"][0]["body"]):
item["id"] = uuid.uuid1().bytes
for key, value in item.items():
if key == "id":
item[key] = {"B": bytes(value)}
elif key == "year":
item[key] = {"N": str(value)}
elif key == "amt_harvested":
item[key] = {"N": str(value)}
elif key == "consumed":
item[key] = {"N": str(value)}
else:
item[key] = {"S": str(value)}
time.sleep(0.001)
dynamo.put_item(TableName="TableOne", Item=dict(item))
Lambda Event Source Mapping for SQS 将轮询消息并根据批大小(默认为 10)为一批记录调用 Lambda 函数。处理批应通过循环 event["Records"]
数组来完成。
设置批量大小应考虑的关键因素。
- 如果 lambda 处理失败,整个批次将重新发送并由 AWS 重试。如果函数不能接受处理重复记录,batchsize 应该设置为 1。
- 如果在 lambda 中处理单个记录需要 20 毫秒,AWS 仍会向我们收取 100 毫秒(这是最低)的费用,我们可以通过简单地将批处理大小设置为 5 来轻松降低 5 倍的成本。
一直推荐给
- 将批量大小设置得更高,并将 lambda 代码设为幂等。
- 编写 Lambda 代码以处理所有记录,而不管批大小是多少。
我正在从 API 中分批提取数据并将其发送到 SQS 队列。我遇到问题的地方是处理消息以便将数据发送到 DynamoDB。数据集中应该有 147,689 条记录。但是,当运行代码时,有时少于147,689条记录会放入DynamoDB,有时多于147,689条记录会放入DynamoDB,有时会有147,689条记录放入DynamoDB。它并没有始终如一地将 147,689 条记录放入数据库。
我已经尝试了所有我能想到的方法来尝试解决这个问题,包括(使用 Fifo 队列而不是标准队列,增加可见性超时,增加传递超时,使用 uuid.uuid1() 而不是 uuid .uuid4()) 我正在遍历“记录”列表,所以不确定为什么它不处理整个批次。下面是我处理消息并将数据发送到 DynamoDB 的最新代码:
import boto3
import json
import uuid
import time
dynamo = boto3.client("dynamodb", "us-east-1")
def lambda_handler(event, context):
for item in json.loads(event["Records"][0]["body"]):
item["id"] = uuid.uuid1().bytes
for key, value in item.items():
if key == "id":
item[key] = {"B": bytes(value)}
elif key == "year":
item[key] = {"N": str(value)}
elif key == "amt_harvested":
item[key] = {"N": str(value)}
elif key == "consumed":
item[key] = {"N": str(value)}
else:
item[key] = {"S": str(value)}
time.sleep(0.001)
dynamo.put_item(TableName="TableOne", Item=dict(item))
Lambda Event Source Mapping for SQS 将轮询消息并根据批大小(默认为 10)为一批记录调用 Lambda 函数。处理批应通过循环 event["Records"]
数组来完成。
设置批量大小应考虑的关键因素。
- 如果 lambda 处理失败,整个批次将重新发送并由 AWS 重试。如果函数不能接受处理重复记录,batchsize 应该设置为 1。
- 如果在 lambda 中处理单个记录需要 20 毫秒,AWS 仍会向我们收取 100 毫秒(这是最低)的费用,我们可以通过简单地将批处理大小设置为 5 来轻松降低 5 倍的成本。
一直推荐给
- 将批量大小设置得更高,并将 lambda 代码设为幂等。
- 编写 Lambda 代码以处理所有记录,而不管批大小是多少。