使用一个 pcollection 作为另一个 pcollection 的输入
Use a pcollection as input of another pcollection
在 google 数据流中使用 python sdk,我想做这样的查询:
query_a_and_b = "SELECT a, b FROM TableA"
此查询return是我想用来执行更多查询的元组列表:
query_param = SELECT * from TableA WHERE a = {} and b = {}.format(a, b)
(here i set TableA but it will also be used with TableB, C and D that are inner joined with TableA...)
所以我想做的是:
coll = (p
| 'read a_b_tuples' >> beam.io.Read(beam.io.BigQuerySource(query=query_a_and_b, use_standard_sql=True))
| 'Build SQL' >> beam.Map(lambda x: query_param.format(x['a'], x['b']))
| 'Query pardo' >> beam.ParDo(lambda q: [beam.io.Read(beam.io.BigQuerySource(query=q, use_standard_sql=True))])
| 'Save' >> beam.io.WriteToText('results.csv')
)
我不确定最好的方法,但它行不通。在数据流中实现此目的的首选方法是什么?
最终,这些查询中的每一个都会 return 少量的行(小于 5k),我想在 filtering/processing 的 pandas 数据框中加载这些行,然后将每个元组 (a,b) 的所有 TableA、B、C、D 合并,并将每个元组数据场写入 csv 文件作为结果。
从某种意义上说,我可能错误地缩小了问题的地图,我可以使用光束函数按 a 和 b 进行分组,然后进行处理...?
Beam 尚不直接支持 BigQuery。其他一些转换支持类似的用例,例如JdbcIO.readAll()
可以在数据库中查询一组查询参数,TextIO.readAll()
可以读取一组文件名 - 但 BigQueryIO
还没有这样做,无论是在 Java 还是Python 个 SDK。
在您的 "Query pardo" 中,您可以显式地与 BigQuery REST API 交谈 - 应该没问题,因为您的查询 return 的结果数量很少。
在 google 数据流中使用 python sdk,我想做这样的查询:
query_a_and_b = "SELECT a, b FROM TableA"
此查询return是我想用来执行更多查询的元组列表:
query_param = SELECT * from TableA WHERE a = {} and b = {}.format(a, b) (here i set TableA but it will also be used with TableB, C and D that are inner joined with TableA...)
所以我想做的是:
coll = (p
| 'read a_b_tuples' >> beam.io.Read(beam.io.BigQuerySource(query=query_a_and_b, use_standard_sql=True))
| 'Build SQL' >> beam.Map(lambda x: query_param.format(x['a'], x['b']))
| 'Query pardo' >> beam.ParDo(lambda q: [beam.io.Read(beam.io.BigQuerySource(query=q, use_standard_sql=True))])
| 'Save' >> beam.io.WriteToText('results.csv')
)
我不确定最好的方法,但它行不通。在数据流中实现此目的的首选方法是什么?
最终,这些查询中的每一个都会 return 少量的行(小于 5k),我想在 filtering/processing 的 pandas 数据框中加载这些行,然后将每个元组 (a,b) 的所有 TableA、B、C、D 合并,并将每个元组数据场写入 csv 文件作为结果。
从某种意义上说,我可能错误地缩小了问题的地图,我可以使用光束函数按 a 和 b 进行分组,然后进行处理...?
Beam 尚不直接支持 BigQuery。其他一些转换支持类似的用例,例如JdbcIO.readAll()
可以在数据库中查询一组查询参数,TextIO.readAll()
可以读取一组文件名 - 但 BigQueryIO
还没有这样做,无论是在 Java 还是Python 个 SDK。
在您的 "Query pardo" 中,您可以显式地与 BigQuery REST API 交谈 - 应该没问题,因为您的查询 return 的结果数量很少。