如何在“AWS Step Functions”中共享数据而不在步骤之间传递数据

How to share data in `AWS Step Functions` without passing it between the steps

我使用 AWS Step Functions 并有以下工作流程

initStep - 这是一个 lambda 函数处理程序,它获取一些数据并将其发送到 SQS 以供外部服务。

activity = os.getenv('ACTIVITY')
queue_name = os.getenv('QUEUE_NAME')

def lambda_handler(event, context):
  event['my_activity'] = activity
  data = json.dumps(event)

  # Retrieving a queue by its name
  sqs = boto3.resource('sqs')
  queue = sqs.get_queue_by_name(QueueName=queue_name)

  queue.send_message(MessageBody=data, MessageGroupId='messageGroup1' + str(datetime.time(datetime.now())))

  return event

validationWaiting - 这是一个 activity 等待包含数据的外部服务的应答。

complete - 这是一个 lambda 函数处理程序,它使用来自 initStep.

的数据
def lambda_handler(event, context):
  email = event['email'] if 'email' in event else None
  data = event['data'] if 'data' in event else None

  client = boto3.client(service_name='ses')
  to = email.split(', ')
  message_conrainer = {'Subject': {'Data': 'Email from step functions'},
           'Body': {'Html': {
               'Charset': "UTF-8",
               'Data': """<html><body>
                            <p>""" + data """</p>
                            </body> </html> """
           }}}

  destination = {'ToAddresses': to,
               'CcAddresses': [],
               'BccAddresses': []}

  return client.send_email(Source=from_addresses,
                         Destination=destination,
                         Message=message_container)

它确实有效,但问题是我正在将完整数据从 initStep 发送到外部服务,只是为了稍后将其传递给 complete。可能会添加更多步骤。

我认为最好将其作为某种全局数据(当前步骤函数)进行共享,这样我就可以添加或删除步骤,并且数据仍然可供所有人使用。

您可以使用 InputPathResultPath。在 initStep 中,您只会将必要的数据发送到外部服务(可能连同一些唯一的执行标识符)。在 ValidaitonWaiting 步骤中,您可以设置以下属性(在状态机定义中):

  • InputPath:将向GetActivityTask提供什么数据。可能您想将其设置为类似 $.execution_unique_id 的内容,其中 execution_unique_id 是外部服务用于识别执行的数据字段(以将其与 initStep 期间的特定请求相匹配)。
  • ResultPath:ValidationWaiting Activity 的输出将保存在数据中。您可以将其设置为 $.validation_output 并且来自外部服务的 json 结果将出现在那里。

通过这种方式,您可以仅向外部服务发送它实际需要的数据,并且您不会失去对输入中之前(ValidationWaiting 步骤之前)的任何数据的访问权。

例如,您可以定义以下状态机:

{
  "StartAt": "initStep",
  "States": {
    "initStep": {
      "Type": "Pass",
      "Result": {
        "executionId": "some:special:id",
        "data": {},
        "someOtherData": {"value": "key"}
      },
      "Next": "ValidationWaiting"
    },
    "ValidationWaiting": {
      "Type": "Pass",
      "InputPath": "$.executionId",
      "ResultPath": "$.validationOutput",
      "Result": {
        "validationMessages": ["a", "b"]
      },
      "Next": "Complete"
    },
    "Complete": {
      "Type": "Pass",
      "End": true
    }
  }
}

我为 initStepValidationWaiting 使用了 Pass 状态来简化示例(我没有 运行 它,但它应该可以工作)。 Result 字段特定于 Pass 任务,它等同于您的 Lambda 函数或 Activity.

的结果

在这种情况下 Complete 步骤将获得以下输入:

{
  "executionId": "some:special:id",
  "data": {},
  "someOtherData": {"value": key"},
  "validationOutput": {
    "validationMessages": ["a", "b"]
  }
}

因此 ValidationWaiting 步骤的结果已保存到 validationOutput 字段中。

这里是使用 InputPath 和 ResultPath 的简短解决方案。我的 Lambda Check_Ubuntu_Updates return 准备更新的实例列表。此实例列表由步骤 Notify_Results 接收,然后它使用此数据。请记住,如果您的 Step Function 中有多个 ResultPath,并且您在一个步骤中需要超过 1 个输入,您只能将 InputPath 与 $.

一起使用
{
  "Comment": "A state machine that check some updates systems available.",
  "StartAt": "Check_Ubuntu_Updates",
  "States": {
    "Check_Ubuntu_Updates": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:#############:function:Check_Ubuntu_Updates",
      "ResultPath": "$.instances",
      "Next": "Notify_Results"
    },
    "Notify_Results": {
      "Type": "Task",
      "InputPath": "$.instances",
      "Resource": "arn:aws:lambda:us-east-1:#############:function:Notify_Results",
      "End": true
    }
  }
}

根据 的回答,我想出了自己的解决方案。

我需要使用 Type: Task,因为 initStep 是一个发送 SQS 的 lambda。

我在ValidationWaiting中不需要InputPath,只需要ResultPath,它存储在activity中收到的数据。

我使用 Serverless 框架,这是我的最终解决方案:

StartAt: initStep
States: 
  initStep:
    Type: Task
    Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:init-step
    Next: ValidationWaiting
  ValidationWaiting:
    Type: Task
    ResultPath: $.validationOutput
    Resource: arn:aws:states:#{AWS::Region}:#{AWS::AccountId}:activity:validationActivity
    Next: Complete
    Catch:
      - ErrorEquals:
        - States.ALL
      ResultPath: $.validationOutput
      Next: Complete
  Complete:
    Type: Task
    Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:complete-step
    End: true