将 SQS Lambda 批处理拆分为部分 success/partial 失败
Splittling SQS Lambda batch into partial success/partial failure
AWS SQS -> Lambda 集成允许您批量处理传入消息,您可以在其中配置可以在单个批次中接收的最大数量。如果您在处理期间抛出异常以指示失败,所有消息都不会从传入队列中删除,并且可以在可见性超时过后由另一个 lambda 拾取以进行处理。
出于性能原因,有没有办法保持批处理,但允许批处理中的一些消息成功(并从入站队列中删除)并且只保留一些未删除的批处理?
一种选择是手动将失败的消息发送回队列,然后将成功回复给 SQS,这样就不会出现重复。
你可以做一些事情,比如设置失败计数,这样如果所有消息都失败了,你可以简单地return所有消息的失败状态,否则如果失败计数小于 10(10 是最大值您可以从 SQS -> Lambda 事件中获得的批量大小)然后您可以将失败的消息单独发送回队列,然后回复成功消息。
此外,为避免任何可能的无限重试循环,在将事件发送回队列之前将 属性 添加到 "retry" 计数之类的事件,并在 [=16] 时丢弃事件=] 大于 X.
手动将失败的消息重新排队到队列的问题在于,您可能会进入无限循环,其中这些项目永远失败并重新排队并再次失败。由于它们被重新发送到队列中,因此它们的重试计数每次都会重置,这意味着它们永远不会失败进入死信队列。您还会失去可见性超时的好处。这也不利于监控目的,因为除非您手动检查日志,否则您永远无法知道自己是否处于不良状态。
更好的方法是手动删除 成功的 项,然后抛出异常以使其余批次失败。成功的项目将从队列中移除,所有实际失败的项目将达到其正常 visibility timeout
周期并保留其 receive count
值,您将能够实际使用和监控死信队列。与其他方法相比,这总体上也更少工作。
注意事项
- 仅在出现部分批处理失败时才覆盖默认行为。如果所有项目都成功,让默认行为顺其自然
- 由于您要跟踪每个队列项的失败情况,因此您需要捕获并记录每个出现的异常,以便您稍后可以看到发生了什么
我最近遇到了这个问题,在我们这边不编写任何代码的情况下处理这个问题的最佳方法是使用 EventSourceMapping 的 FunctionResponseTypes 属性。使用它我们只需要传递失败消息 Id 列表,事件源将处理删除成功消息。
请结帐Using SQS and Lambda
为 lambda 配置 Eventsource 的 Cloudformation 模板
"FunctionEventSourceMapping": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"BatchSize": "100",
"Enabled": "True",
"EventSourceArn": {"Fn::GetAtt": ["SQSQueue", "Arn"]},
"FunctionName": "FunctionName",
"MaximumBatchingWindowInSeconds": "100",
"FunctionResponseTypes": ["ReportBatchItemFailures"] # This is important
}
}
使用上述配置配置事件源后,它应该如下所示
然后我们只需要 return 来自我们的 lambda
的以下格式的响应
{"batchItemFailures": [{"itemIdentifier": "85f26da9-fceb-4252-9560-243376081199"}]}
在 batchIntemFailures 列表中提供失败消息 ID 的列表
如果您的 lambda 运行时环境在 python 中,请使用上述格式的 return dict 用于基于 java 的运行时,您可以使用 aws-lambda-java-event
示例 Python 代码
这种方法的优点是
- 您无需添加任何代码即可从 SQS 队列中手动删除消息
- 您不必为了从队列中删除消息而包括任何第三方库或 boto,这将帮助您减小最终工件的大小。
- 保持简单愚蠢
附带说明确保您的 lambda 对 sqs 具有获取和删除消息所需的权限。
谢谢
AWS SQS -> Lambda 集成允许您批量处理传入消息,您可以在其中配置可以在单个批次中接收的最大数量。如果您在处理期间抛出异常以指示失败,所有消息都不会从传入队列中删除,并且可以在可见性超时过后由另一个 lambda 拾取以进行处理。
出于性能原因,有没有办法保持批处理,但允许批处理中的一些消息成功(并从入站队列中删除)并且只保留一些未删除的批处理?
一种选择是手动将失败的消息发送回队列,然后将成功回复给 SQS,这样就不会出现重复。
你可以做一些事情,比如设置失败计数,这样如果所有消息都失败了,你可以简单地return所有消息的失败状态,否则如果失败计数小于 10(10 是最大值您可以从 SQS -> Lambda 事件中获得的批量大小)然后您可以将失败的消息单独发送回队列,然后回复成功消息。
此外,为避免任何可能的无限重试循环,在将事件发送回队列之前将 属性 添加到 "retry" 计数之类的事件,并在 [=16] 时丢弃事件=] 大于 X.
手动将失败的消息重新排队到队列的问题在于,您可能会进入无限循环,其中这些项目永远失败并重新排队并再次失败。由于它们被重新发送到队列中,因此它们的重试计数每次都会重置,这意味着它们永远不会失败进入死信队列。您还会失去可见性超时的好处。这也不利于监控目的,因为除非您手动检查日志,否则您永远无法知道自己是否处于不良状态。
更好的方法是手动删除 成功的 项,然后抛出异常以使其余批次失败。成功的项目将从队列中移除,所有实际失败的项目将达到其正常 visibility timeout
周期并保留其 receive count
值,您将能够实际使用和监控死信队列。与其他方法相比,这总体上也更少工作。
注意事项
- 仅在出现部分批处理失败时才覆盖默认行为。如果所有项目都成功,让默认行为顺其自然
- 由于您要跟踪每个队列项的失败情况,因此您需要捕获并记录每个出现的异常,以便您稍后可以看到发生了什么
我最近遇到了这个问题,在我们这边不编写任何代码的情况下处理这个问题的最佳方法是使用 EventSourceMapping 的 FunctionResponseTypes 属性。使用它我们只需要传递失败消息 Id 列表,事件源将处理删除成功消息。 请结帐Using SQS and Lambda
为 lambda 配置 Eventsource 的 Cloudformation 模板
"FunctionEventSourceMapping": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"BatchSize": "100",
"Enabled": "True",
"EventSourceArn": {"Fn::GetAtt": ["SQSQueue", "Arn"]},
"FunctionName": "FunctionName",
"MaximumBatchingWindowInSeconds": "100",
"FunctionResponseTypes": ["ReportBatchItemFailures"] # This is important
}
}
使用上述配置配置事件源后,它应该如下所示
然后我们只需要 return 来自我们的 lambda
的以下格式的响应{"batchItemFailures": [{"itemIdentifier": "85f26da9-fceb-4252-9560-243376081199"}]}
在 batchIntemFailures 列表中提供失败消息 ID 的列表 如果您的 lambda 运行时环境在 python 中,请使用上述格式的 return dict 用于基于 java 的运行时,您可以使用 aws-lambda-java-event
示例 Python 代码
这种方法的优点是
- 您无需添加任何代码即可从 SQS 队列中手动删除消息
- 您不必为了从队列中删除消息而包括任何第三方库或 boto,这将帮助您减小最终工件的大小。
- 保持简单愚蠢
附带说明确保您的 lambda 对 sqs 具有获取和删除消息所需的权限。
谢谢