数据流:使用 python 管道更新 BigQuery 行
Dataflow: update BigQuery rows with python pipeline
想象一个简单的 Google 数据流管道。在此管道中,您使用 apache beam 函数从 BQ 读取,并且根据返回的 pcollection,您必须更新这些行
Journeys = (p
| 'Read from BQ' >> beam.io.Read(
beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))
Update = ( Journeys
| 'Updating Journey Table' >> beam.Map(UpdateBQ))
Write = (Journeys
| 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))
此管道的问题在于,当您读取 table (beam.Map)
时,会为返回的 pcollection 中的每个项目执行 UpdateBQ
执行 BigQuery 更新的更好方法是什么table?
我想这可以在不使用 beam.Map 的情况下完成,并且只执行并一次更新所有输入 pcolletion 的进程。
额外
def UpdateBQ(input):
from google.cloud import bigquery
import uuid
import time
client = bigquery.Client()
STD = "#standardSQL"
QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
<...>
可能的解决方案
with beam.Pipeline(options=options) as p:
Journeys = (p
| 'Read from BQ' >> beam.io.Read(
beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
)
Write = (Journeys
| 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))
UpdateBQ();
从 BQ 读取后,您是否正在使用光束管道进行任何进一步的转换?或者它只是您在代码中显示的方式,即从 BQ 读取然后在 BQ 中触发更新命令?在这种情况下,您根本不需要光束。只需使用 BQ 查询来更新 table 中的数据并使用另一个 table。 BQ best practices 建议一次避免单行 insert/update。
想象一个简单的 Google 数据流管道。在此管道中,您使用 apache beam 函数从 BQ 读取,并且根据返回的 pcollection,您必须更新这些行
Journeys = (p
| 'Read from BQ' >> beam.io.Read(
beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))
Update = ( Journeys
| 'Updating Journey Table' >> beam.Map(UpdateBQ))
Write = (Journeys
| 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))
此管道的问题在于,当您读取 table (beam.Map)
时,会为返回的 pcollection 中的每个项目执行 UpdateBQ执行 BigQuery 更新的更好方法是什么table?
我想这可以在不使用 beam.Map 的情况下完成,并且只执行并一次更新所有输入 pcolletion 的进程。
额外
def UpdateBQ(input):
from google.cloud import bigquery
import uuid
import time
client = bigquery.Client()
STD = "#standardSQL"
QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
<...>
可能的解决方案
with beam.Pipeline(options=options) as p:
Journeys = (p
| 'Read from BQ' >> beam.io.Read(
beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
)
Write = (Journeys
| 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))
UpdateBQ();
从 BQ 读取后,您是否正在使用光束管道进行任何进一步的转换?或者它只是您在代码中显示的方式,即从 BQ 读取然后在 BQ 中触发更新命令?在这种情况下,您根本不需要光束。只需使用 BQ 查询来更新 table 中的数据并使用另一个 table。 BQ best practices 建议一次避免单行 insert/update。