如何使用 python 在 bigquery 中将非分区 table 复制到分区 table 的摄取时间?

How to copy a non partitioned table into an ingestion time partitioned table in bigquery using python?

用例如下: 我们有一个 table foo,它的数据每天都会被替换。我们希望开始将旧数据保存在基于 table 的历史摄取时间分区中,称为 foo_HIST。

我有以下 google-cloud bigquery 代码:1.6.1

bq_client = bigquery.Client(project=env_conf.gcp_project_id)
dataset = bigquery.dataset.DatasetReference(
    env_conf.gcp_project_id, env_conf.bq_dataset
)

full_table_src = table_conf.table_name()
table_src = dataset.table(full_table_src)
table_dst_name = f"{full_table_src}_HIST"
table_dst = dataset.table(table_dst_name)
table_dst.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.HOUR,
)

# Truncate per partition.
job_config = bigquery.CopyJobConfig(
    create_disposition="CREATE_IF_NEEDED",
    write_disposition="WRITE_TRUNCATE",
)

job = bq_client.copy_table(table_src, table_dst, job_config=job_config)

确实创建了新的table,但是用bq cli查看时,好像不是基于分区的table。这是输出。

bq show --format=prettyjson dataset_id.foo_HIST

{
  "creationTime": "1616418131814",
  "etag": "iqfdDzv2ifdsfERfwTiFjQ==",
  "id": "project_id:dataset_id.foo_HIST",
  "kind": "bigquery#table",
  "lastModifiedTime": "1616418131814",
  "location": "EU",
  "numBytes": "32333",
  "numLongTermBytes": "0",
  "numRows": "406",
  "schema": {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "MPG",
        "type": "FLOAT"
      },
    ]
  },
  "selfLink": "https://bigquery.googleapis.com/bigquery/v2/projects/project_id/datasets/dataset_id/tables/foo_HIST",
  "tableReference": {
    "datasetId": "dataset_id",
    "projectId": "project_id",
    "tableId": "foo_HIST"
  },
  "type": "TABLE"
}

任何想知道如何在 python 中将非分区 table 复制到分区 table(如果需要创建它)的人:

似乎 CopyJob 不支持开箱即用,与 QueryJob 相反。这是使用 QueryJob 的最终片段:

    bq_client = bigquery.Client(project=gcp_project_id)
    dataset = bigquery.dataset.DatasetReference(
        gcp_project_id, dataset_id
    )

    table_src = dataset.table(table_name)
    table_dst_name = f"{table_name}_HIST"
    table_dst = dataset.table(table_dst_name)
    query = f"""
    SELECT *
    FROM `{project_id}`.dataset_id:table_name
    """

    job_config = bigquery.QueryJobConfig(
        create_disposition="CREATE_IF_NEEDED",
        write_disposition="WRITE_APPEND",
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.HOUR,
        ),
        use_legacy_sql=False,
        allow_large_results=True,
        destination=table_dst,
    )
    job = bq_client.query(query, job_config=job_config)
    job.result()  # Wait for job to finish