JSON table BigQuerySink 的 bigquery.TableSchema 架构
JSON table schema to bigquery.TableSchema for BigQuerySink
我有一个非常重要的 table 模式(涉及嵌套和重复的字段)以 JSON 格式(具有名称、类型、模式属性)定义并存储在文件中。它已成功用于使用 bq load 命令填充 bigquery table。
但是当我尝试使用 Dataflow Python SDK 和 BigQuerySink 做同样的事情时,schema
参数需要是 'name':'type'
元素的逗号分隔列表,或者一个 bigquery.TableSchema
对象。
有什么方便的方法可以将我的 JSON 架构转换为 bigquery.TableSchema
,或者我是否必须将其转换为 name:value
列表?
目前您不能直接指定JSON架构。您必须将架构指定为包含逗号分隔字段列表的字符串或 bigquery.TableSchema
对象。
如果架构复杂且包含嵌套的 and/or 个重复字段,我们建议构建一个 bigquery.TableSchema
对象。
这是一个示例 bigquery.TableSchema
具有嵌套和重复字段的对象。
from apitools.clients import bigquery
table_schema = bigquery.TableSchema()
# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)
# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)
# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)
# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)
我遇到了同样的问题。在我的例子中,我已经在 bigquery 中加载了一些 json,并自动生成了一个模式。
所以我能够使用以下命令获取自动生成的架构:
bq show --format prettyjson my-gcp-project:my-bq-table |jq .schema > my-bq-table.json
然后可以使用此代码段
将架构转换为 bigquery.TableSchema
from apache_beam.io.gcp.internal.clients import bigquery
def _get_field_schema(**kwargs):
field_schema = bigquery.TableFieldSchema()
field_schema.name = kwargs['name']
field_schema.type = kwargs.get('type', 'STRING')
field_schema.mode = kwargs.get('mode', 'NULLABLE')
fields = kwargs.get('fields')
if fields:
for field in fields:
field_schema.fields.append(_get_field_schema(**field))
return field_schema
def _inject_fields(fields, table_schema):
for field in fields:
table_schema.fields.append(_get_field_schema(**field))
def parse_bq_json_schema(schema):
table_schema = bigquery.TableSchema()
_inject_fields(schema['fields'], table_schema)
return table_schema
它将与 bigquery json 模式规范一起工作,如果你像我一样懒惰,你可以避免指定 type
和 mode
如果你对一个字段感到满意默认情况下可为空的字符串。
上面由 Andrea Pierleoni 发布的代码片段适用于 google-cloud-bigquery
python 客户端的旧版本,例如 google-cloud-bigquery
的 0.25.0
版本恰好通过pip install apache-beam[gcp]
.
但是,BigQuery Python 客户端 API 在 google-cloud-bigquery
的最新版本中发生了巨大变化,例如在我当前使用的版本 1.8.0
中,bigquery.TableFieldSchema()
和 bigquery.TableSchema()
不起作用。
如果您使用的是更新版本的 google-cloud-bigquery
软件包,您可以按照以下方法获取所需的 SchemaField
列表(例如,需要创建 table)来自 JSON 文件。这是对 Andrea Pierleoni 上面发布的代码的改编(感谢!)
def _get_field_schema(field):
name = field['name']
field_type = field.get('type', 'STRING')
mode = field.get('mode', 'NULLABLE')
fields = field.get('fields', [])
if fields:
subschema = []
for f in fields:
fields_res = _get_field_schema(f)
subschema.append(fields_res)
else:
subschema = []
field_schema = bigquery.SchemaField(name=name,
field_type=field_type,
mode=mode,
fields=subschema
)
return field_schema
def parse_bq_json_schema(schema_filename):
schema = []
with open(schema_filename, 'r') as infile:
jsonschema = json.load(infile)
for field in jsonschema:
schema.append(_get_field_schema(field))
return schema
现在,假设您有一个 table 的 schema already defined in JSON. Say you had this particular "schema.json" file,然后使用上述辅助方法,您可以获得 Python 客户端所需的 SchemaField
表示像这样:
>>> res_schema = parse_bq_json_schema("schema.json")
>>> print(res_schema)
[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]
现在到 create a table having the above schema using the Python SDK,你会做:
dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)
您可以选择像这样设置基于时间的分区(如果需要):
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field='timestamp' # name of column to use for partitioning
)
这最终创建了 table:
table = bqclient.create_table(table)
print('Created table {}, partitioned on column {}'.format(
table.table_id, table.time_partitioning.field))
这是一个可以帮助您的简单程序。
import json
from apache_beam.io.gcp.internal.clients import bigquery
def bq_schema(json_schema):
table_schema = bigquery.TableSchema()
with open(json_schema) as json_file:
data = json.load(json_file)
for p in data:
field = bigquery.TableFieldSchema()
field.name = p['name']
field.type = p['type']
field.mode = p['mode']
table_schema.fields.append(field)
return table_schema
如今,您可以使用 built-in parse_table_schema_from_json 功能:
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
with open('schema.json') as f:
schema_string = f.read()
table_schema = parse_table_schema_from_json(schema_string)
BigQuery 库中有一个内置转换器函数:
from google.cloud import bigquery
...
client = bigquery.Client()
client.schema_from_json('path/to/schema.json`)
我有一个非常重要的 table 模式(涉及嵌套和重复的字段)以 JSON 格式(具有名称、类型、模式属性)定义并存储在文件中。它已成功用于使用 bq load 命令填充 bigquery table。
但是当我尝试使用 Dataflow Python SDK 和 BigQuerySink 做同样的事情时,schema
参数需要是 'name':'type'
元素的逗号分隔列表,或者一个 bigquery.TableSchema
对象。
有什么方便的方法可以将我的 JSON 架构转换为 bigquery.TableSchema
,或者我是否必须将其转换为 name:value
列表?
目前您不能直接指定JSON架构。您必须将架构指定为包含逗号分隔字段列表的字符串或 bigquery.TableSchema
对象。
如果架构复杂且包含嵌套的 and/or 个重复字段,我们建议构建一个 bigquery.TableSchema
对象。
这是一个示例 bigquery.TableSchema
具有嵌套和重复字段的对象。
from apitools.clients import bigquery
table_schema = bigquery.TableSchema()
# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)
# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)
# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)
# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)
我遇到了同样的问题。在我的例子中,我已经在 bigquery 中加载了一些 json,并自动生成了一个模式。
所以我能够使用以下命令获取自动生成的架构:
bq show --format prettyjson my-gcp-project:my-bq-table |jq .schema > my-bq-table.json
然后可以使用此代码段
将架构转换为bigquery.TableSchema
from apache_beam.io.gcp.internal.clients import bigquery
def _get_field_schema(**kwargs):
field_schema = bigquery.TableFieldSchema()
field_schema.name = kwargs['name']
field_schema.type = kwargs.get('type', 'STRING')
field_schema.mode = kwargs.get('mode', 'NULLABLE')
fields = kwargs.get('fields')
if fields:
for field in fields:
field_schema.fields.append(_get_field_schema(**field))
return field_schema
def _inject_fields(fields, table_schema):
for field in fields:
table_schema.fields.append(_get_field_schema(**field))
def parse_bq_json_schema(schema):
table_schema = bigquery.TableSchema()
_inject_fields(schema['fields'], table_schema)
return table_schema
它将与 bigquery json 模式规范一起工作,如果你像我一样懒惰,你可以避免指定 type
和 mode
如果你对一个字段感到满意默认情况下可为空的字符串。
上面由 Andrea Pierleoni 发布的代码片段适用于 google-cloud-bigquery
python 客户端的旧版本,例如 google-cloud-bigquery
的 0.25.0
版本恰好通过pip install apache-beam[gcp]
.
但是,BigQuery Python 客户端 API 在 google-cloud-bigquery
的最新版本中发生了巨大变化,例如在我当前使用的版本 1.8.0
中,bigquery.TableFieldSchema()
和 bigquery.TableSchema()
不起作用。
如果您使用的是更新版本的 google-cloud-bigquery
软件包,您可以按照以下方法获取所需的 SchemaField
列表(例如,需要创建 table)来自 JSON 文件。这是对 Andrea Pierleoni 上面发布的代码的改编(感谢!)
def _get_field_schema(field):
name = field['name']
field_type = field.get('type', 'STRING')
mode = field.get('mode', 'NULLABLE')
fields = field.get('fields', [])
if fields:
subschema = []
for f in fields:
fields_res = _get_field_schema(f)
subschema.append(fields_res)
else:
subschema = []
field_schema = bigquery.SchemaField(name=name,
field_type=field_type,
mode=mode,
fields=subschema
)
return field_schema
def parse_bq_json_schema(schema_filename):
schema = []
with open(schema_filename, 'r') as infile:
jsonschema = json.load(infile)
for field in jsonschema:
schema.append(_get_field_schema(field))
return schema
现在,假设您有一个 table 的 schema already defined in JSON. Say you had this particular "schema.json" file,然后使用上述辅助方法,您可以获得 Python 客户端所需的 SchemaField
表示像这样:
>>> res_schema = parse_bq_json_schema("schema.json")
>>> print(res_schema)
[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]
现在到 create a table having the above schema using the Python SDK,你会做:
dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)
您可以选择像这样设置基于时间的分区(如果需要):
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field='timestamp' # name of column to use for partitioning
)
这最终创建了 table:
table = bqclient.create_table(table)
print('Created table {}, partitioned on column {}'.format(
table.table_id, table.time_partitioning.field))
这是一个可以帮助您的简单程序。
import json
from apache_beam.io.gcp.internal.clients import bigquery
def bq_schema(json_schema):
table_schema = bigquery.TableSchema()
with open(json_schema) as json_file:
data = json.load(json_file)
for p in data:
field = bigquery.TableFieldSchema()
field.name = p['name']
field.type = p['type']
field.mode = p['mode']
table_schema.fields.append(field)
return table_schema
如今,您可以使用 built-in parse_table_schema_from_json 功能:
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
with open('schema.json') as f:
schema_string = f.read()
table_schema = parse_table_schema_from_json(schema_string)
BigQuery 库中有一个内置转换器函数:
from google.cloud import bigquery
...
client = bigquery.Client()
client.schema_from_json('path/to/schema.json`)