BigQuery 动态 SQL 和 Python

BigQuery Dynamic SQL with Python

最近 GCP BQ 支持动态SQL。我想用 Cloud Functions 试试这个。

My BQ Dynamic SQL(在 UI 上有效)

declare cols string;
set cols=(select STRING_AGG (column_name,',') from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name='tbla');
EXECUTE IMMEDIATE format("""select %s from `my_db.tbla`""",cols);

我想从我的 python 代码传递 table_name 值,但问题是,Python BQ lib 是否支持它?

任何示例 python 代码?

我尝试了这些代码,但没有成功

代码 1:

def hello_gcs(event, context):
    table_name='tbla'
    client = bigquery.Client()
    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
    sql=( '''
declare cols string;
set cols=(select STRING_AGG (column_name,',') from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name=?);
EXECUTE IMMEDIATE format("""select @ col from `my_db.tbla`""") using cols
''',(table_name))
    query_job = client.query(sql, job_config=job_config)
    results = query_job.result()  
    for row in results:
       print("{} : {} views".format(row.url, row.view_count))

错误:

, line 130, in result raise self._exception google.api_core.exceptions.BadRequest: 400 Query error: Positional parameters are not supported at [3:104]
from google.cloud import bigquery

def hello_gcs(event, context):
    table_name='tbla'
    client = bigquery.Client()
    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
    sql=( '''
declare cols string;
set cols=(select STRING_AGG (column_name,',') from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name=%s);
EXECUTE IMMEDIATE format("""select @ col from `my_db.tbla`""") using cols
''',(table_name))
    query_job = client.query(sql, job_config=job_config)
    results = query_job.result()  
    for row in results:
       print("{} : {} views".format(row.url, row.view_count))

错误:

line 130, in result raise self._exception google.api_core.exceptions.BadRequest: 400 Syntax error: Illegal input character "%" at [3:104]

最后,我找到了正确的语法。还注意到我的代码中存在一些错误。

错误:

  1. SQL SQL 变量中的查询语法错误(sql=('''.......'''))

  2. 最后一行打印语句错误,我的select查询没有url and view_count列。

  3. 在动态SQL中,我们必须在where条件中传递单引号(如果是字符串)

示例工作代码:

代码 1:

from google.cloud import bigquery

table_name='tbla'
client = bigquery.Client()

job_config = bigquery.QueryJobConfig(use_legacy_sql=False)

sql="declare cols string;
set cols=(select STRING_AGG (column_name,',') 
from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name='{}');
EXECUTE IMMEDIATE format(\"\"\"select %s from `manan.tbla` \"\"\",cols)".format(table_name)

print(sql)

query_job = client.query(sql, job_config=job_config)

results = query_job.result()
for row in results:
        print(row)

代码 2:

from google.cloud import bigquery

table_name='tbla'
client = bigquery.Client()

job_config = bigquery.QueryJobConfig(use_legacy_sql=False)

sql="declare cols string;
set cols=(select STRING_AGG (column_name,',')
 from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name='{}');
EXECUTE IMMEDIATE format(\"\"\"select ? from `my_db.tbla` \"\"\") using cols".format(table_name)

print(sql)

query_job = client.query(sql, job_config=job_config)

results = query_job.result()
for row in results:
        print(row)

代码 3:

from google.cloud import bigquery

table_name='tbla'
client = bigquery.Client()

job_config = bigquery.QueryJobConfig(use_legacy_sql=False)

sql="declare cols string;set cols=(select STRING_AGG (column_name,',') from
`my_db.INFORMATION_SCHEMA.COLUMNS` where table_name='{}');EXECUTE IMMEDIATE
format(\"\"\"select @ col from `my_db.tbla` \"\"\") using cols as col".format(table_name)

print(sql)

query_job = client.query(sql, job_config=job_config)

results = query_job.result()
for row in results:
        print(row)

Jinja2 SQL 模板是构建动态 SQL 的更好选择。示例:

create or replace table {{ params.targetTable }}
as
select
    {{ params.targetColumnList|join(',') }},
    cast(null as timestamp) as begin_timestamp,
    cast(null as timestamp) as end_timestamp
from
    {{ params.sourceTable }};