为加载作业指定的空架构 - 加载 CSV
Empty schema specified for the load job - Load CSV
我正在尝试通过 GoogleCloudStorageToBigQueryOperator 任务将 CSV 文件从 Google Cloud Storage 加载到空的 Google Big Query table 中。
t8 = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_send_dim_report',
bucket='report',
source_objects=[
'gs://report/test-dim-report/dim_report_{{ ds_nodash }}.csv'
],
schema_fields=['filename_pdf','filename_png', 'week_date', 'code'],
skip_leading_rows=1,
source_format = 'CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
destination_project_dataset_table='xxxx-yyyy:report.test_dim_report_{{ ds_nodash }}',
dag=dag
)
要加载的 table 已经在 Big Query 中定义了架构,即便如此,为了面对这个错误,我在我正在使用的 CSV 的列中添加了参数 schema_fields
。查看任务日志,首先遇到如下依赖错误:
from google.appengine.api import memcache
[2018-06-22 05:58:49,650] {base_task_runner.py:98} INFO - Subtask: ImportError: No module named 'google.appengine'
[2018-06-22 05:58:49,650] {base_task_runner.py:98} INFO - Subtask:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask: During handling of the above exception, another exception occurred:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 33, in <module>
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: from oauth2client.contrib.locked_file import LockedFile
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: ImportError: No module named 'oauth2client.contrib.locked_file'
在日志的最后,显示最后的错误:
Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'message': 'Empty schema specified for the load job. Please specify a schema that describes the data being loaded.'}.
我正在寻找解决该错误的方法,以便将我的 CSV 文件成功加载到 Google Big Query
有两种方法可以实现这一点。这全部来自代码文档,还有这个初始位:
The schema to be used for the BigQuery table may be specified in one of
two ways. You may either directly pass the schema fields in, or you may
point the operator to a Google cloud storage object name. The object in
Google cloud storage must be a JSON file with the schema fields in it.
- 正确定义
schema_fields
,如 GoogleCloudStorageToBigQueryOperator
的文档所示。可以在此处找到如何定义架构的示例:https://cloud.google.com/bigquery/docs/schemas
If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
Should not be set when source_format is 'DATASTORE_BACKUP'.
示例(来自示例 link):
schema = [
bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'),
]
- 设置
schema_object
.
If set, a GCS object path pointing to a .json file that
contains the schema for the table. (templated)
正如 dboshardy 所指出的,tobi6 提供的答案导致以下错误:
ERROR - Object of type 'SchemaField' is not JSON serializable
如错误所示,SchemaField 不是 JSON 可序列化的 class,而参数 schema_fields 需要 JSON 可序列化的对象。
根据 Airflow 文档,解决方案是将模式作为字典列表传递:https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
示例(基于 OP 问题):
schema = [
{"name": "filename_pdf", "type": "STRING", "mode": "REQUIRED"},
{"name": "filename_png", "type": "STRING", "mode": "REQUIRED"},
{"name": "week_date", "type": "DATE", "mode": "REQUIRED"},
{"name": "code", "type": "INTEGER", "mode": "NULLABLE"}
]
所提供的解决方案已通过 Google Cloud Composer (airflow v1.10.6) 成功测试类似问题。
我正在尝试通过 GoogleCloudStorageToBigQueryOperator 任务将 CSV 文件从 Google Cloud Storage 加载到空的 Google Big Query table 中。
t8 = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_send_dim_report',
bucket='report',
source_objects=[
'gs://report/test-dim-report/dim_report_{{ ds_nodash }}.csv'
],
schema_fields=['filename_pdf','filename_png', 'week_date', 'code'],
skip_leading_rows=1,
source_format = 'CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
destination_project_dataset_table='xxxx-yyyy:report.test_dim_report_{{ ds_nodash }}',
dag=dag
)
要加载的 table 已经在 Big Query 中定义了架构,即便如此,为了面对这个错误,我在我正在使用的 CSV 的列中添加了参数 schema_fields
。查看任务日志,首先遇到如下依赖错误:
from google.appengine.api import memcache
[2018-06-22 05:58:49,650] {base_task_runner.py:98} INFO - Subtask: ImportError: No module named 'google.appengine'
[2018-06-22 05:58:49,650] {base_task_runner.py:98} INFO - Subtask:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask: During handling of the above exception, another exception occurred:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 33, in <module>
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: from oauth2client.contrib.locked_file import LockedFile
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: ImportError: No module named 'oauth2client.contrib.locked_file'
在日志的最后,显示最后的错误:
Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'message': 'Empty schema specified for the load job. Please specify a schema that describes the data being loaded.'}.
我正在寻找解决该错误的方法,以便将我的 CSV 文件成功加载到 Google Big Query
有两种方法可以实现这一点。这全部来自代码文档,还有这个初始位:
The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it.
- 正确定义
schema_fields
,如GoogleCloudStorageToBigQueryOperator
的文档所示。可以在此处找到如何定义架构的示例:https://cloud.google.com/bigquery/docs/schemas
If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load Should not be set when source_format is 'DATASTORE_BACKUP'.
示例(来自示例 link):
schema = [
bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'),
]
- 设置
schema_object
.
If set, a GCS object path pointing to a .json file that contains the schema for the table. (templated)
正如 dboshardy 所指出的,tobi6 提供的答案导致以下错误:
ERROR - Object of type 'SchemaField' is not JSON serializable
如错误所示,SchemaField 不是 JSON 可序列化的 class,而参数 schema_fields 需要 JSON 可序列化的对象。
根据 Airflow 文档,解决方案是将模式作为字典列表传递:https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
示例(基于 OP 问题):
schema = [
{"name": "filename_pdf", "type": "STRING", "mode": "REQUIRED"},
{"name": "filename_png", "type": "STRING", "mode": "REQUIRED"},
{"name": "week_date", "type": "DATE", "mode": "REQUIRED"},
{"name": "code", "type": "INTEGER", "mode": "NULLABLE"}
]
所提供的解决方案已通过 Google Cloud Composer (airflow v1.10.6) 成功测试类似问题。