使用管道数据查询 BigQuery apache_beam

Use pipeline data to query BigQuery apache_beam

我想使用管道中 运行 中的数据生成查询并在 BigQuery 上执行它。

假设我有这样的东西 python SQL 模板:

template = '''
SELECT
  email
FROM
  `project_id.dataset_id.table_id`
WHERE
  email = {runtime_email}
'''

我想以 runtime_email 源自管道数据(元素)的方式格式化此模板。

管道从 PubSub 读取变量 runtime_email 和电子邮件 example@test.com

然后我将执行如下操作:

with beam.Pipeline(options=options) as p:
    bq_results = (p
        | LoadDataFromPubSub()
        | beam.io.Read(
            beam.io.BigQuerySource(
                query=template.format(element['runtime_email']),
                use_standard_sql=True
            )
        )
    )

关于如何利用管道数据 运行 下一步有什么想法吗?

您构建管道的方式不正确。请记住,Beam 构建一个图形,然后执行它。

在这里,您定义了 2 个来源,1 个 PubSub,1 个 BigQuery。 BQ 源在管道启动之前初始化。顺便说一句,你的 runtime_email 将永远是 None.

您有 2 个解决方案:

  • 在开始您的管道之前阅读您的 PubSub。您可以在 python 代码中或在外部执行此操作,并在 pipeline_options 中提供数据。然后迭代所有 Pubsub 消息并构建尽可能多的 BQ 源。
  • 将您的 PubSub 源保留在管道中,并使用 python 库(而不是 beam)进行标准 BQ 调用以读取行。如果您想流式传输数据,这是推荐的方式。