如何使用云功能触发数据流? (Python 开发工具包)

How to trigger a dataflow with a cloud function? (Python SDK)

我有一个由云触发的云函数Pub/Sub。我想要使​​用 Python SDK 的相同功能触发数据流。这是我的代码:

import base64
def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'
    print('Message of pubsub : {}'.format(message))

我这样部署函数:

gcloud beta functions deploy hello_pubsub  --runtime python37 --trigger-topic topic1

您可以使用 Cloud Dataflow templates 来启动您的工作。您将需要编写以下步骤的代码:

  • 检索凭据
  • 生成 Dataflow 服务实例
  • 获取 GCP PROJECT_ID
  • 生成模板正文
  • 执行模板

这是一个使用您的基本代码的示例(随意拆分为多个方法以减少 hello_pubsub 方法中的代码)。

from googleapiclient.discovery import build
import base64
import google.auth
import os

def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'

    credentials, _ = google.auth.default()
    service = build('dataflow', 'v1b3', credentials=credentials)
    gcp_project = os.environ["GCLOUD_PROJECT"]

    template_path = gs://template_file_path_on_storage/
    template_body = {
        "parameters": {
            "keyA": "valueA",
            "keyB": "valueB",
        },
        "environment": {
            "envVariable": "value"
        }
    }

    request = service.projects().templates().launch(projectId=gcp_project, gcsPath=template_path, body=template_body)
    response = request.execute()

    print(response)

在 template_body 变量中,参数值是将发送到您的管道的参数,数据流服务(服务帐户、工作人员和网络配置)使用环境值。

LaunchTemplateParameters documentation

RuntimeEnvironment documentation

您必须将您的管道 python 代码嵌入到您的函数中。当你的函数被调用时,你只需调用管道 python main 函数,它会在你的文件中执行管道。

如果您在 Cloud Shell 中开发并试用了您的管道,并且您已经在 Dataflow 管道中 运行 它,您的代码应该具有以下结构:

def run(argv=None, save_main_session=True):
  # Parse argument
  # Set options
  # Start Pipeline in p variable
  # Perform your transform in Pipeline
  # Run your Pipeline
  result = p.run()
  # Wait the end of the pipeline
  result.wait_until_finish()

因此,使用正确的参数调用此函数,尤其是 runner=DataflowRunner 以允许 python 代码在 Dataflow 服务中加载管道。

删除末尾的 result.wait_until_finish(),因为您的函数不会在所有数据流过程中存活很长时间。

如果需要,您也可以使用模板。