Fork/Join StepFunction 映射步骤失败

Fork/Join on StepFunction Map Steps with failures

我有一个步骤函数,它以许多并行步骤开始(每个并行步骤都是一个 lambda 调用),然后是完成一些最终处理的完成步骤。

它可以在这里可视化(下面也给出了状态函数定义的编辑版本)。我知道您可以在并行步骤周围添加 try/catch 逻辑,但如果我的理解是正确的,那不会阻止其他并行步骤继续进行,也不会将它们全部发送到不同的状态。

理想情况下,如果任何并行步骤因任何原因失败,所有当前步骤(以及未来步骤)都将被取消,并且它们永远不会进入最终确定阶段,而是进入第三阶段不同执行的状态(称之为错误恢复)。这个工作流程可行吗?如果是这样,是否保证在进入 Recovery 状态之前所有的 Parallel Steps 都已停止?

步进函数定义

{
  "Comment": "An example of the Amazon States Language using a map state to process elements of an array with a max concurrency of 2.",
  "StartAt": "Map",
  "States": {
    "Map": {
      "Type": "Map",
      "ItemsPath": "$.items",
      "Parameters": {
        ...
      },
      "MaxConcurrency": 2,
      "Next": "Finalize",
      "Iterator": {
        "StartAt": "Parallel Step",
        "States": {
          "Parallel Step": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Parameters": {
              "FunctionName": "arn:aws:lambda:us-east-1:<>:function:lambda-parallel-step:$LATEST",
              "Payload": {
                "Input.$": "$"
              }
            },
            "OutputPath": "$.Payload",
            "End": true
          }
        }
      }
    },
    "Finalize": {
      "Type": "Pass",
      "End": true
    }}}

当我回到这里时,答案比我想象的要简单。您可以在上面显示的整个 Map 上放置一个 catch。如果其中的任何内容有未捕获的异常,您将执行该 Catch 语句

指示的任何操作

稍微修改我的输入

{
  "Comment": "Pipeline to read data from S3 and index into Elasticsearch",
  "StartAt": "Map",
  "States": {
    "Map": {
      "Type": "Map",
      "ItemsPath": "$.items",
      "Parameters": {
        ...
      },
      "ResultPath": "$.parallel-output",
      "MaxConcurrency": 6,
      "Next": "Finalize",
      "Iterator": {
        "StartAt": "Parallel",
        "States": {
          "Parallel": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Parameters": {
              "FunctionName": "arn:aws:lambda:us-east-1:<>:function:parallel:$LATEST",
              "Payload": {
                "Input.$": "$"
              }
            },
            "OutputPath": "$.Payload",
            "End": true
          }
        }
      },
      "Catch": [ {"ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "Cleanup State"}]
    },
    "Finalize": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:us-east-1:<>:function:finalize:$LATEST",
        "Payload": {
          "Input.$": "$"
        }
      },
      "End": true
    },
    "Cleanup State": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:us-east-1:<>:function:cleanup:$LATEST",
        "Payload": {
          "Input.$": "$"
        }
      },
      "Next": "Fail State"
    },
    "Fail State": {
      "Type": "Fail",
      "Error": "ErrorCode",
      "Cause": "Caused By Message"
    }
  }
}

在这个示例中,它将针对任何失败执行此操作,但文档说明了如何针对不同类型的错误、重试等执行更复杂的错误处理。

DAG 看起来像