使用 python 将 bigquery 分区替换为存储在 bigquery table 中的数据
replace bigquery partition with data staged in bigquery table using python
我在 bigquery 中的每日分区 table 中暂存了 30 天的数据。我有一个更大的 table,每天对 5 年的数据进行分区。我需要从 table 阶段 select 并在我的阶段 table 中的 30 天内替换较大 table 中现有分区的全部内容。我的偏好是使用 Python 来执行此操作,而不是先将数据提取到 csv,然后再将其加载回 BQ(如果可以避免的话)。有什么建议么?提前致谢。
您需要做的就是查询您需要的内容并为您的查询设置目标table。
from google.cloud import bigquery
client = bigquery.Client()
query = """\
SELECT firstname + ' ' + last_name AS full_name,
FLOOR(DATEDIFF(CURRENT_DATE(), birth_date) / 365) AS age
FROM dataset_name.persons
"""
dataset = client.dataset('dataset_name')
table = dataset.table(name='person_ages')
job = client.run_async_query('fullname-age-query-job', query)
job.destination = table
job.write_disposition= 'truncate'
job.begin()
这实际上对我不起作用,但我确实认为它是正确的,尽管是针对旧版本的大查询客户端库。您的回答确实有很大帮助,我会接受。我正在使用最新的库。以下对我有用:
for partition in gbq.list_partitions(stage_table_ref):
table_partition = table_name+'$'+partition
stage_partition = stage_dataset.table(table_partition)
target_partition = target_dataset.table(table_partition)
job_config = bigquery.CopyJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
gbq.copy_table(stage_partition, target_partition,job_config = job_config)
我在 bigquery 中的每日分区 table 中暂存了 30 天的数据。我有一个更大的 table,每天对 5 年的数据进行分区。我需要从 table 阶段 select 并在我的阶段 table 中的 30 天内替换较大 table 中现有分区的全部内容。我的偏好是使用 Python 来执行此操作,而不是先将数据提取到 csv,然后再将其加载回 BQ(如果可以避免的话)。有什么建议么?提前致谢。
您需要做的就是查询您需要的内容并为您的查询设置目标table。
from google.cloud import bigquery
client = bigquery.Client()
query = """\
SELECT firstname + ' ' + last_name AS full_name,
FLOOR(DATEDIFF(CURRENT_DATE(), birth_date) / 365) AS age
FROM dataset_name.persons
"""
dataset = client.dataset('dataset_name')
table = dataset.table(name='person_ages')
job = client.run_async_query('fullname-age-query-job', query)
job.destination = table
job.write_disposition= 'truncate'
job.begin()
这实际上对我不起作用,但我确实认为它是正确的,尽管是针对旧版本的大查询客户端库。您的回答确实有很大帮助,我会接受。我正在使用最新的库。以下对我有用:
for partition in gbq.list_partitions(stage_table_ref):
table_partition = table_name+'$'+partition
stage_partition = stage_dataset.table(table_partition)
target_partition = target_dataset.table(table_partition)
job_config = bigquery.CopyJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
gbq.copy_table(stage_partition, target_partition,job_config = job_config)