如何使用云功能触发数据流? (Python 开发工具包)
How to trigger a dataflow with a cloud function? (Python SDK)
我有一个由云触发的云函数Pub/Sub。我想要使用 Python SDK 的相同功能触发数据流。这是我的代码:
import base64
def hello_pubsub(event, context):
if 'data' in event:
message = base64.b64decode(event['data']).decode('utf-8')
else:
message = 'hello world!'
print('Message of pubsub : {}'.format(message))
我这样部署函数:
gcloud beta functions deploy hello_pubsub --runtime python37 --trigger-topic topic1
您可以使用 Cloud Dataflow templates 来启动您的工作。您将需要编写以下步骤的代码:
- 检索凭据
- 生成 Dataflow 服务实例
- 获取 GCP PROJECT_ID
- 生成模板正文
- 执行模板
这是一个使用您的基本代码的示例(随意拆分为多个方法以减少 hello_pubsub 方法中的代码)。
from googleapiclient.discovery import build
import base64
import google.auth
import os
def hello_pubsub(event, context):
if 'data' in event:
message = base64.b64decode(event['data']).decode('utf-8')
else:
message = 'hello world!'
credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials)
gcp_project = os.environ["GCLOUD_PROJECT"]
template_path = gs://template_file_path_on_storage/
template_body = {
"parameters": {
"keyA": "valueA",
"keyB": "valueB",
},
"environment": {
"envVariable": "value"
}
}
request = service.projects().templates().launch(projectId=gcp_project, gcsPath=template_path, body=template_body)
response = request.execute()
print(response)
在 template_body 变量中,参数值是将发送到您的管道的参数,数据流服务(服务帐户、工作人员和网络配置)使用环境值。
您必须将您的管道 python 代码嵌入到您的函数中。当你的函数被调用时,你只需调用管道 python main 函数,它会在你的文件中执行管道。
如果您在 Cloud Shell 中开发并试用了您的管道,并且您已经在 Dataflow 管道中 运行 它,您的代码应该具有以下结构:
def run(argv=None, save_main_session=True):
# Parse argument
# Set options
# Start Pipeline in p variable
# Perform your transform in Pipeline
# Run your Pipeline
result = p.run()
# Wait the end of the pipeline
result.wait_until_finish()
因此,使用正确的参数调用此函数,尤其是 runner=DataflowRunner
以允许 python 代码在 Dataflow 服务中加载管道。
删除末尾的 result.wait_until_finish()
,因为您的函数不会在所有数据流过程中存活很长时间。
如果需要,您也可以使用模板。
我有一个由云触发的云函数Pub/Sub。我想要使用 Python SDK 的相同功能触发数据流。这是我的代码:
import base64
def hello_pubsub(event, context):
if 'data' in event:
message = base64.b64decode(event['data']).decode('utf-8')
else:
message = 'hello world!'
print('Message of pubsub : {}'.format(message))
我这样部署函数:
gcloud beta functions deploy hello_pubsub --runtime python37 --trigger-topic topic1
您可以使用 Cloud Dataflow templates 来启动您的工作。您将需要编写以下步骤的代码:
- 检索凭据
- 生成 Dataflow 服务实例
- 获取 GCP PROJECT_ID
- 生成模板正文
- 执行模板
这是一个使用您的基本代码的示例(随意拆分为多个方法以减少 hello_pubsub 方法中的代码)。
from googleapiclient.discovery import build
import base64
import google.auth
import os
def hello_pubsub(event, context):
if 'data' in event:
message = base64.b64decode(event['data']).decode('utf-8')
else:
message = 'hello world!'
credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials)
gcp_project = os.environ["GCLOUD_PROJECT"]
template_path = gs://template_file_path_on_storage/
template_body = {
"parameters": {
"keyA": "valueA",
"keyB": "valueB",
},
"environment": {
"envVariable": "value"
}
}
request = service.projects().templates().launch(projectId=gcp_project, gcsPath=template_path, body=template_body)
response = request.execute()
print(response)
在 template_body 变量中,参数值是将发送到您的管道的参数,数据流服务(服务帐户、工作人员和网络配置)使用环境值。
您必须将您的管道 python 代码嵌入到您的函数中。当你的函数被调用时,你只需调用管道 python main 函数,它会在你的文件中执行管道。
如果您在 Cloud Shell 中开发并试用了您的管道,并且您已经在 Dataflow 管道中 运行 它,您的代码应该具有以下结构:
def run(argv=None, save_main_session=True):
# Parse argument
# Set options
# Start Pipeline in p variable
# Perform your transform in Pipeline
# Run your Pipeline
result = p.run()
# Wait the end of the pipeline
result.wait_until_finish()
因此,使用正确的参数调用此函数,尤其是 runner=DataflowRunner
以允许 python 代码在 Dataflow 服务中加载管道。
删除末尾的 result.wait_until_finish()
,因为您的函数不会在所有数据流过程中存活很长时间。
如果需要,您也可以使用模板。