使用管道数据查询 BigQuery apache_beam
Use pipeline data to query BigQuery apache_beam
我想使用管道中 运行 中的数据生成查询并在 BigQuery 上执行它。
假设我有这样的东西 python SQL 模板:
template = '''
SELECT
email
FROM
`project_id.dataset_id.table_id`
WHERE
email = {runtime_email}
'''
我想以 runtime_email
源自管道数据(元素)的方式格式化此模板。
管道从 PubSub 读取变量 runtime_email
和电子邮件 example@test.com
然后我将执行如下操作:
with beam.Pipeline(options=options) as p:
bq_results = (p
| LoadDataFromPubSub()
| beam.io.Read(
beam.io.BigQuerySource(
query=template.format(element['runtime_email']),
use_standard_sql=True
)
)
)
关于如何利用管道数据 运行 下一步有什么想法吗?
您构建管道的方式不正确。请记住,Beam 构建一个图形,然后执行它。
在这里,您定义了 2 个来源,1 个 PubSub,1 个 BigQuery。 BQ 源在管道启动之前初始化。顺便说一句,你的 runtime_email
将永远是 None
.
您有 2 个解决方案:
- 在开始您的管道之前阅读您的 PubSub。您可以在 python 代码中或在外部执行此操作,并在
pipeline_options
中提供数据。然后迭代所有 Pubsub 消息并构建尽可能多的 BQ 源。
- 将您的 PubSub 源保留在管道中,并使用 python 库(而不是 beam)进行标准 BQ 调用以读取行。如果您想流式传输数据,这是推荐的方式。
我想使用管道中 运行 中的数据生成查询并在 BigQuery 上执行它。
假设我有这样的东西 python SQL 模板:
template = '''
SELECT
email
FROM
`project_id.dataset_id.table_id`
WHERE
email = {runtime_email}
'''
我想以 runtime_email
源自管道数据(元素)的方式格式化此模板。
管道从 PubSub 读取变量 runtime_email
和电子邮件 example@test.com
然后我将执行如下操作:
with beam.Pipeline(options=options) as p:
bq_results = (p
| LoadDataFromPubSub()
| beam.io.Read(
beam.io.BigQuerySource(
query=template.format(element['runtime_email']),
use_standard_sql=True
)
)
)
关于如何利用管道数据 运行 下一步有什么想法吗?
您构建管道的方式不正确。请记住,Beam 构建一个图形,然后执行它。
在这里,您定义了 2 个来源,1 个 PubSub,1 个 BigQuery。 BQ 源在管道启动之前初始化。顺便说一句,你的 runtime_email
将永远是 None
.
您有 2 个解决方案:
- 在开始您的管道之前阅读您的 PubSub。您可以在 python 代码中或在外部执行此操作,并在
pipeline_options
中提供数据。然后迭代所有 Pubsub 消息并构建尽可能多的 BQ 源。 - 将您的 PubSub 源保留在管道中,并使用 python 库(而不是 beam)进行标准 BQ 调用以读取行。如果您想流式传输数据,这是推荐的方式。