使用 Java API/Dataflow - "Repeated field must be imported as a JSON array" 将重复记录插入 Big Query

Inserting repeated records into Big Query with Java API/Dataflow - "Repeated field must be imported as a JSON array"

我有重复键值(字符串,字符串)记录对的数据作为大查询 table 架构中的字段之一。

我正在尝试使用此处的方法添加这些重复记录:http://sookocheff.com/post/bigquery/creating-a-big-query-table-java-api/

为重复记录字段创建的 table 架构如下所示:

TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");

我正在插入像这样的数据作为 DoFn 的一部分:

Map<String,String> record = ... // key-value pairs
List<TableRow> rawFields = new ArrayList<>();
record.forEach((k,v)->
    rawFields.add(new TableRow().set("key",k).set("value", v))
);
TableRow row = new TableRow();
// row has other fields, omitted here
row.set("rawFields", rawFields);

DoFn 在我的数据流管道中,就在 BigQueryIO.Write:

之前
.apply(BigQueryIO.Write
        .named("WriteLBLogLines")
        .to("xxx:yyy.zzz")
        .withSchema(mySchema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

当我尝试 运行 通过 Dataflow 执行此操作时,出现以下错误:

errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0 at file: gs://xxxxxxxxxx/12271006010671464167/dax-tmp-2016-06-28_14_47_26-12271006010671462904-S04-1-303c4f638f6b411b/-shard-00002-of-00003-try-021aff4c448b3177-endshard.json. Repeated field must be imported as a JSON array. Field: rawFields.

我的方法有什么问题?看来我没有使用正确的方法来插入重复的记录。

我尝试使用以下代码重现该问题,但它执行成功。模式的其他方面是否存在问题?

List<TableFieldSchema> fields = new ArrayList<>();
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
fields.add(column);
TableSchema schema = new TableSchema().setFields(fields);

TableRow row = new TableRow();
List<TableRow> rawFields = new ArrayList<>();
rawFields.add(new TableRow().set("key","foo").set("value", "bar"));
row.set("rawFields", rawFields);

Pipeline p = Pipeline.create(options);
PCollection<TableRow> c =
    p.apply(Create.of(row, row).withCoder(TableRowJsonCoder.of()));
c.apply(BigQueryIO.Write.named("BigQuery-Write")
        .to(options.getOutput())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        .withSchema(schema));
p.run();