AssertionError: INTERNAL: No default project is specified

AssertionError: INTERNAL: No default project is specified

气流的新手。尝试 运行 sql 并将结果存储在 BigQuery table 中。

出现以下错误。不确定在哪里设置 default_rpoject_id.

请帮助我。

错误:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 585, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/bigquery_operator.py", line 82, in execute
    self.allow_large_results, self.udf_config, self.use_legacy_sql)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 228, in run_query
    default_project_id=self.project_id)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 917, in _split_tablename
    assert default_project_id is not None, "INTERNAL: No default project is specified"
AssertionError: INTERNAL: No default project is specified

代码:

sql_bigquery = BigQueryOperator(
        task_id='sql_bigquery',
        use_legacy_sql=False,
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        bql='''
            #standardSQL
                SELECT ID, Name, Group, Mark, RATIO_TO_REPORT(Mark) OVER(PARTITION BY Group) AS percent FROM `tensile-site-168620.temp.marks`
                ''',
        destination_dataset_table='temp.percentage',
        dag=dag
        )

编辑:我最终解决了这个问题,只需在 BigQueryOperator 任务中添加 bigquery_conn_id='bigquery' 参数,在单独的 python 脚本中 运行 下面的代码之后。

显然您需要在 Airflow 的 Admin -> Connection 中指定您的项目 ID UI。您必须将其作为 JSON 对象执行,例如 "project" : "".

就我个人而言,我无法让网络服务器在 GCP 上运行,所以这是不可行的。这里有一个程序化的解决方案:

from airflow.models import Connection
from airflow.settings import Session

session = Session()
gcp_conn = Connection(
    conn_id='bigquery',
    conn_type='google_cloud_platform',
    extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}')
if not session.query(Connection).filter(
        Connection.conn_id == gcp_conn.conn_id).first():
    session.add(gcp_conn)
    session.commit()

这些建议来自

当 运行 本地气流时,我得到同样的错误。我的解决方案是添加以下连接字符串作为环境变量:

AIRFLOW_CONN_BIGQUERY_DEFAULT="google-cloud-platform://?extra__google_cloud_platform__project=<YOUR PROJECT HERE>"

BigQueryOperator 使用“bigquery_default”连接。未指定时,本地气流使用缺少 属性 project_id 的内部连接版本。如您所见,上面的连接字符串提供了 project_id 属性.

启动时,Airflow 将以“AIRFLOW_”开头的环境变量加载到内存中。如气流文档 here 中所述,此机制可用于覆盖气流属性并在本地 运行 时提供连接。请注意,这在 运行 直接气流而不启动 Web 服务器的情况下也有效。

所以我已经为我的所有连接设置了环境变量,例如 AIRFLOW_CONN_MYSQL_DEFAULT。我已将它们放入从我的 IDE 获取的 .ENV 文件中,但将它们放入您的 .bash_profile 中也可以正常工作。

当您在 Cloud Composer 上查看气流实例时,您会看到在“bigquery_default”连接处设置了 project_id属性。这就是 BigQueryOperator 在 运行 通过 Cloud Composer 时起作用的原因。

(我使用的是 airflow 1.10.2 和 BigQuery 1.10.2)