如何使来自 Lambda 的批处理中的特定 SQS 消息失败?
How do I fail a specific SQS message in a batch from a Lambda?
我有一个带有 SQS 触发器的 Lambda。当它被击中时,来自 SQS 的一批记录进来(我认为通常一次大约 10 条)。如果我 return 来自处理程序的失败状态代码,将重试所有 10 条消息。如果我 return 成功代码,它们将从队列中删除。如果这 10 条消息中有 1 条失败,而我只想重试那条怎么办?
exports.handler = async (event) => {
for(const e of event.Records){
try {
let body = JSON.parse(e.body);
// do things
}
catch(e){
// one message failed, i want it to be retried
}
}
// returning this causes ALL messages in
// this batch to be removed from the queue
return {
statusCode: 200,
body: 'Finished.'
};
};
我是否必须手动将那些消息重新添加回队列?或者我可以 return 来自我的处理程序的状态,指示一条消息失败并且应该重试吗?
是的,您必须手动将失败的消息重新添加回队列。
我建议做的是设置一个失败计数,这样如果所有消息都失败了,你可以简单地return所有消息的失败状态,否则如果失败计数小于 10 那么你可以单独发送将失败的消息返回队列。
您需要以不同的方式设计您的应用程序,这里有一些想法不是最好的,但可以解决您的问题。
方案一:
- 创建 sqs 传递队列 - sq1
- 根据延迟要求sq2创建延迟队列
- 创建死信队列sdl
现在在 lambda 函数中,如果消息在 sq1 中失败,则在 sq1 上删除它并将其放在 sq2 上以重试任何异步调用的 Lambda 函数都会在事件被丢弃之前重试两次。如果重试失败。
如果重试后再次失败移入死信队列 sdl .
注意:当最初创建并启用 SQS 事件源映射时,或者在一段时间没有流量后首次出现时,Lambda 服务将开始使用五个并行长轮询连接轮询 SQS 队列,根据 AWS文档中,从 AWS Lambda 到 SQS 的长轮询的默认持续时间是 20 秒。
方案二:
使用 AWS StepFunction
StepFunction 将调用 lambda 并在需要时使用可配置的指数退避处理失败时的重试逻辑。
- https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html
- https://cloudacademy.com/blog/aws-step-functions-a-serverless-orchestrator/
**解决方案 3:**
CloudWatch 计划事件触发轮询 FAILED 的 Lambda 函数。
给定事件源的错误处理取决于 Lambda 的调用方式。 Amazon CloudWatch Events 异步调用您的 Lambda 函数。
- https://docs.aws.amazon.com/lambda/latest/dg/retries-on-errors.html
- https://engineering.opsgenie.com/aws-lambda-performance-series-part-2-an-analysis-on-async-lambda-fail-retry-behaviour-and-dead-b84620af406
- https://dzone.com/articles/asynchronous-retries-with-aws-sqs
- https://medium.com/@ron_73212/how-to-handle-aws-lambda-errors-like-a-pro-e5455b013d10
您必须在成功处理后以编程方式删除每条消息。
因此,如果任何消息失败,您可以将标志设置为 true,并且根据它,您可以在批量处理所有消息后引发错误,因此成功的消息将被删除,其他消息将根据重试策略。
所以按照下面的逻辑,只有失败和未处理的消息才会被重试。
import boto3
sqs = boto3.client("sqs")
def handler(event, context):
for message in event['records']:
queue_url = "form queue url recommended to set it as env variable"
message_body = message["body"]
print("do some processing :)")
message_receipt_handle = message["receiptHandle"]
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message_receipt_handle
)
还有一种方法是将处理成功的消息id保存到变量中,根据消息id
进行batch删除操作
response = client.delete_message_batch(
QueueUrl='string',
Entries=[
{
'Id': 'string',
'ReceiptHandle': 'string'
},
]
)
根据 AWS documentation,SQS 事件源映射现在支持开箱即用地处理部分故障。链接文章的要点如下:
- 在您的事件源映射配置中包含 ReportBatchItemFailures
- 失败时的响应语法必须修改为
{"batchItemFailures": [{"itemIdentifier": "id2"},{"itemIdentifier": "id4"}]}
,其中 id2 和 id4 是批处理中失败的 meesageIds
- 按原样引用文档:
Lambda treats a batch as a complete success if your function returns
any of the following:
An empty batchItemFailures list
A null batchItemFailures list
An empty EventResponse
A null EventResponse
Lambda treats a batch as a complete failure if your function returns
any of the following:
An invalid JSON response
An empty string itemIdentifier
A null itemIdentifier
An itemIdentifier with a bad key name
An itemIdentifier value with a message ID that doesn't exist
根据documentation. But one of the AWS labs example points to its usage in SAM,SAM 支持尚不可用,但在测试时对我有效
AWS 支持部分批处理响应。这是打字稿代码的示例
type Result = {
itemIdentifier: string
status: 'failed' | 'success'
}
const isFulfilled = <T>(
result: PromiseFulfilledResult<T> | PromiseRejectedResult
): result is PromiseFulfilledResult<T> => result.status === 'fulfilled'
const isFailed = (
result: PromiseFulfilledResult<Result>
): result is PromiseFulfilledResult<
Omit<Result, 'status'> & { status: 'failed' }
> => result.value.status === 'failed'
const results = await Promise.allSettled(
sqsEvent.Records.map(async (record) => {
try {
return { status: 'success', itemIdentifier: record.messageId }
} catch(e) {
console.error(e);
return { status: 'failed', itemIdentifier: record.messageId }
}
})
)
return results
.filter(isFulfilled)
.filter(isFailed)
.map((result) => ({
itemIdentifier: result.value.itemIdentifier,
}))
我有一个带有 SQS 触发器的 Lambda。当它被击中时,来自 SQS 的一批记录进来(我认为通常一次大约 10 条)。如果我 return 来自处理程序的失败状态代码,将重试所有 10 条消息。如果我 return 成功代码,它们将从队列中删除。如果这 10 条消息中有 1 条失败,而我只想重试那条怎么办?
exports.handler = async (event) => {
for(const e of event.Records){
try {
let body = JSON.parse(e.body);
// do things
}
catch(e){
// one message failed, i want it to be retried
}
}
// returning this causes ALL messages in
// this batch to be removed from the queue
return {
statusCode: 200,
body: 'Finished.'
};
};
我是否必须手动将那些消息重新添加回队列?或者我可以 return 来自我的处理程序的状态,指示一条消息失败并且应该重试吗?
是的,您必须手动将失败的消息重新添加回队列。
我建议做的是设置一个失败计数,这样如果所有消息都失败了,你可以简单地return所有消息的失败状态,否则如果失败计数小于 10 那么你可以单独发送将失败的消息返回队列。
您需要以不同的方式设计您的应用程序,这里有一些想法不是最好的,但可以解决您的问题。
方案一:
- 创建 sqs 传递队列 - sq1
- 根据延迟要求sq2创建延迟队列
- 创建死信队列sdl
现在在 lambda 函数中,如果消息在 sq1 中失败,则在 sq1 上删除它并将其放在 sq2 上以重试任何异步调用的 Lambda 函数都会在事件被丢弃之前重试两次。如果重试失败。
如果重试后再次失败移入死信队列 sdl .
注意:当最初创建并启用 SQS 事件源映射时,或者在一段时间没有流量后首次出现时,Lambda 服务将开始使用五个并行长轮询连接轮询 SQS 队列,根据 AWS文档中,从 AWS Lambda 到 SQS 的长轮询的默认持续时间是 20 秒。
方案二:
使用 AWS StepFunction
StepFunction 将调用 lambda 并在需要时使用可配置的指数退避处理失败时的重试逻辑。
- https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html
- https://cloudacademy.com/blog/aws-step-functions-a-serverless-orchestrator/
**解决方案 3:**
CloudWatch 计划事件触发轮询 FAILED 的 Lambda 函数。
给定事件源的错误处理取决于 Lambda 的调用方式。 Amazon CloudWatch Events 异步调用您的 Lambda 函数。
- https://docs.aws.amazon.com/lambda/latest/dg/retries-on-errors.html
- https://engineering.opsgenie.com/aws-lambda-performance-series-part-2-an-analysis-on-async-lambda-fail-retry-behaviour-and-dead-b84620af406
- https://dzone.com/articles/asynchronous-retries-with-aws-sqs
- https://medium.com/@ron_73212/how-to-handle-aws-lambda-errors-like-a-pro-e5455b013d10
您必须在成功处理后以编程方式删除每条消息。
因此,如果任何消息失败,您可以将标志设置为 true,并且根据它,您可以在批量处理所有消息后引发错误,因此成功的消息将被删除,其他消息将根据重试策略。
所以按照下面的逻辑,只有失败和未处理的消息才会被重试。
import boto3
sqs = boto3.client("sqs")
def handler(event, context):
for message in event['records']:
queue_url = "form queue url recommended to set it as env variable"
message_body = message["body"]
print("do some processing :)")
message_receipt_handle = message["receiptHandle"]
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message_receipt_handle
)
还有一种方法是将处理成功的消息id保存到变量中,根据消息id
进行batch删除操作response = client.delete_message_batch(
QueueUrl='string',
Entries=[
{
'Id': 'string',
'ReceiptHandle': 'string'
},
]
)
根据 AWS documentation,SQS 事件源映射现在支持开箱即用地处理部分故障。链接文章的要点如下:
- 在您的事件源映射配置中包含 ReportBatchItemFailures
- 失败时的响应语法必须修改为
{"batchItemFailures": [{"itemIdentifier": "id2"},{"itemIdentifier": "id4"}]}
,其中 id2 和 id4 是批处理中失败的 meesageIds - 按原样引用文档:
Lambda treats a batch as a complete success if your function returns any of the following:
An empty batchItemFailures list
A null batchItemFailures list
An empty EventResponse
A null EventResponse
Lambda treats a batch as a complete failure if your function returns any of the following:
An invalid JSON response
An empty string itemIdentifier
A null itemIdentifier
An itemIdentifier with a bad key name
An itemIdentifier value with a message ID that doesn't exist
根据documentation. But one of the AWS labs example points to its usage in SAM,SAM 支持尚不可用,但在测试时对我有效
AWS 支持部分批处理响应。这是打字稿代码的示例
type Result = {
itemIdentifier: string
status: 'failed' | 'success'
}
const isFulfilled = <T>(
result: PromiseFulfilledResult<T> | PromiseRejectedResult
): result is PromiseFulfilledResult<T> => result.status === 'fulfilled'
const isFailed = (
result: PromiseFulfilledResult<Result>
): result is PromiseFulfilledResult<
Omit<Result, 'status'> & { status: 'failed' }
> => result.value.status === 'failed'
const results = await Promise.allSettled(
sqsEvent.Records.map(async (record) => {
try {
return { status: 'success', itemIdentifier: record.messageId }
} catch(e) {
console.error(e);
return { status: 'failed', itemIdentifier: record.messageId }
}
})
)
return results
.filter(isFulfilled)
.filter(isFailed)
.map((result) => ({
itemIdentifier: result.value.itemIdentifier,
}))