BigQuery:带有 WriteToBigQuery 的 Array<string> 字段

BigQuery: Array<string> field with WriteToBigQuery

我正在 Python 中创建一个 Google 数据流模板:

query = "#standardSQL" +  """
SELECT
  Frame.Serial,
  Frame.Fecha,
  Frame.Longitud,
  Frame.Latitud,
  ARRAY_AGG (CONCAT (ID, '-', Valor) ORDER BY ID) AS Resumen
FROM <...>


TABLE_SCHEMA = 'Serial:STRING,Fecha:DATETIME,Longitud:STRING,Latitud:STRING,Resumen:STRING'

| 'Read from BQ' >> beam.io.Read(beam.io.BigQuerySource(query=query,dataset="xxx",use_standard_sql=True))

| 'Write transform to BigQuery' >> WriteToBigQuery('table',TABLE_SCHEMA)

问题

这失败了,因为 Resumen 字段是一个数组:

Array specified for non-repeated field.


我测试了什么

  1. 直接在 BigQuery UI 中创建 table 语句:

    CREATE TABLE test (Resumen ARRAY<STRING>)

    这行得通。 table 是通过以下方式创建的:

    • 类型:string
    • 模式:Repeated
  2. 更改TABLE_SCHEMA和运行管道:

    TABLE_SCHEMA ='Serial:STRING,Fecha:DATETIME,Longitud:STRING,Latitud:STRING,Resumen:ARRAY<STRING>'

    出现错误:

    "Invalid value for: ARRAY\u003cSTRING\u003e is not a valid value".
    

TABLE_SCHEMA 应该如何创建 table 并与 beam.io.WriteToBigQuery() 一起使用?

如果您在单个字符串中指定 BQ 模式,则似乎不支持重复或嵌套字段:https://beam.apache.org/documentation/io/built-in/google-bigquery/#creating-a-table-schema

您需要明确描述您的架构并将字段模式设置为 repeatedhttps://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py#L95

# A repeated field.
children_schema = bigquery.TableFieldSchema()
children_schema.name = 'children'
children_schema.type = 'string'
children_schema.mode = 'repeated'
table_schema.fields.append(children_schema)