Fork/Join StepFunction 映射步骤失败
Fork/Join on StepFunction Map Steps with failures
我有一个步骤函数,它以许多并行步骤开始(每个并行步骤都是一个 lambda 调用),然后是完成一些最终处理的完成步骤。
它可以在这里可视化(下面也给出了状态函数定义的编辑版本)。我知道您可以在并行步骤周围添加 try/catch 逻辑,但如果我的理解是正确的,那不会阻止其他并行步骤继续进行,也不会将它们全部发送到不同的状态。
理想情况下,如果任何并行步骤因任何原因失败,所有当前步骤(以及未来步骤)都将被取消,并且它们永远不会进入最终确定阶段,而是进入第三阶段不同执行的状态(称之为错误恢复)。这个工作流程可行吗?如果是这样,是否保证在进入 Recovery 状态之前所有的 Parallel Steps 都已停止?
步进函数定义
{
"Comment": "An example of the Amazon States Language using a map state to process elements of an array with a max concurrency of 2.",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemsPath": "$.items",
"Parameters": {
...
},
"MaxConcurrency": 2,
"Next": "Finalize",
"Iterator": {
"StartAt": "Parallel Step",
"States": {
"Parallel Step": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:lambda-parallel-step:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"OutputPath": "$.Payload",
"End": true
}
}
}
},
"Finalize": {
"Type": "Pass",
"End": true
}}}
当我回到这里时,答案比我想象的要简单。您可以在上面显示的整个 Map
上放置一个 catch。如果其中的任何内容有未捕获的异常,您将执行该 Catch
语句
指示的任何操作
稍微修改我的输入
{
"Comment": "Pipeline to read data from S3 and index into Elasticsearch",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemsPath": "$.items",
"Parameters": {
...
},
"ResultPath": "$.parallel-output",
"MaxConcurrency": 6,
"Next": "Finalize",
"Iterator": {
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:parallel:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"OutputPath": "$.Payload",
"End": true
}
}
},
"Catch": [ {"ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "Cleanup State"}]
},
"Finalize": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:finalize:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"End": true
},
"Cleanup State": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:cleanup:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"Next": "Fail State"
},
"Fail State": {
"Type": "Fail",
"Error": "ErrorCode",
"Cause": "Caused By Message"
}
}
}
在这个示例中,它将针对任何失败执行此操作,但文档说明了如何针对不同类型的错误、重试等执行更复杂的错误处理。
DAG 看起来像
我有一个步骤函数,它以许多并行步骤开始(每个并行步骤都是一个 lambda 调用),然后是完成一些最终处理的完成步骤。
它可以在这里可视化(下面也给出了状态函数定义的编辑版本)。我知道您可以在并行步骤周围添加 try/catch 逻辑,但如果我的理解是正确的,那不会阻止其他并行步骤继续进行,也不会将它们全部发送到不同的状态。
理想情况下,如果任何并行步骤因任何原因失败,所有当前步骤(以及未来步骤)都将被取消,并且它们永远不会进入最终确定阶段,而是进入第三阶段不同执行的状态(称之为错误恢复)。这个工作流程可行吗?如果是这样,是否保证在进入 Recovery 状态之前所有的 Parallel Steps 都已停止?
步进函数定义
{
"Comment": "An example of the Amazon States Language using a map state to process elements of an array with a max concurrency of 2.",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemsPath": "$.items",
"Parameters": {
...
},
"MaxConcurrency": 2,
"Next": "Finalize",
"Iterator": {
"StartAt": "Parallel Step",
"States": {
"Parallel Step": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:lambda-parallel-step:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"OutputPath": "$.Payload",
"End": true
}
}
}
},
"Finalize": {
"Type": "Pass",
"End": true
}}}
当我回到这里时,答案比我想象的要简单。您可以在上面显示的整个 Map
上放置一个 catch。如果其中的任何内容有未捕获的异常,您将执行该 Catch
语句
稍微修改我的输入
{
"Comment": "Pipeline to read data from S3 and index into Elasticsearch",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemsPath": "$.items",
"Parameters": {
...
},
"ResultPath": "$.parallel-output",
"MaxConcurrency": 6,
"Next": "Finalize",
"Iterator": {
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:parallel:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"OutputPath": "$.Payload",
"End": true
}
}
},
"Catch": [ {"ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "Cleanup State"}]
},
"Finalize": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:finalize:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"End": true
},
"Cleanup State": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:cleanup:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"Next": "Fail State"
},
"Fail State": {
"Type": "Fail",
"Error": "ErrorCode",
"Cause": "Caused By Message"
}
}
}
在这个示例中,它将针对任何失败执行此操作,但文档说明了如何针对不同类型的错误、重试等执行更复杂的错误处理。
DAG 看起来像