使用 Java API/Dataflow - "Repeated field must be imported as a JSON array" 将重复记录插入 Big Query
Inserting repeated records into Big Query with Java API/Dataflow - "Repeated field must be imported as a JSON array"
我有重复键值(字符串,字符串)记录对的数据作为大查询 table 架构中的字段之一。
我正在尝试使用此处的方法添加这些重复记录:http://sookocheff.com/post/bigquery/creating-a-big-query-table-java-api/
为重复记录字段创建的 table 架构如下所示:
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
我正在插入像这样的数据作为 DoFn 的一部分:
Map<String,String> record = ... // key-value pairs
List<TableRow> rawFields = new ArrayList<>();
record.forEach((k,v)->
rawFields.add(new TableRow().set("key",k).set("value", v))
);
TableRow row = new TableRow();
// row has other fields, omitted here
row.set("rawFields", rawFields);
DoFn 在我的数据流管道中,就在 BigQueryIO.Write:
之前
.apply(BigQueryIO.Write
.named("WriteLBLogLines")
.to("xxx:yyy.zzz")
.withSchema(mySchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
当我尝试 运行 通过 Dataflow 执行此操作时,出现以下错误:
errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0 at file: gs://xxxxxxxxxx/12271006010671464167/dax-tmp-2016-06-28_14_47_26-12271006010671462904-S04-1-303c4f638f6b411b/-shard-00002-of-00003-try-021aff4c448b3177-endshard.json. Repeated field must be imported as a JSON array. Field: rawFields.
我的方法有什么问题?看来我没有使用正确的方法来插入重复的记录。
我尝试使用以下代码重现该问题,但它执行成功。模式的其他方面是否存在问题?
List<TableFieldSchema> fields = new ArrayList<>();
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
fields.add(column);
TableSchema schema = new TableSchema().setFields(fields);
TableRow row = new TableRow();
List<TableRow> rawFields = new ArrayList<>();
rawFields.add(new TableRow().set("key","foo").set("value", "bar"));
row.set("rawFields", rawFields);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> c =
p.apply(Create.of(row, row).withCoder(TableRowJsonCoder.of()));
c.apply(BigQueryIO.Write.named("BigQuery-Write")
.to(options.getOutput())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withSchema(schema));
p.run();
我有重复键值(字符串,字符串)记录对的数据作为大查询 table 架构中的字段之一。
我正在尝试使用此处的方法添加这些重复记录:http://sookocheff.com/post/bigquery/creating-a-big-query-table-java-api/
为重复记录字段创建的 table 架构如下所示:
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
我正在插入像这样的数据作为 DoFn 的一部分:
Map<String,String> record = ... // key-value pairs
List<TableRow> rawFields = new ArrayList<>();
record.forEach((k,v)->
rawFields.add(new TableRow().set("key",k).set("value", v))
);
TableRow row = new TableRow();
// row has other fields, omitted here
row.set("rawFields", rawFields);
DoFn 在我的数据流管道中,就在 BigQueryIO.Write:
之前.apply(BigQueryIO.Write
.named("WriteLBLogLines")
.to("xxx:yyy.zzz")
.withSchema(mySchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
当我尝试 运行 通过 Dataflow 执行此操作时,出现以下错误:
errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0 at file: gs://xxxxxxxxxx/12271006010671464167/dax-tmp-2016-06-28_14_47_26-12271006010671462904-S04-1-303c4f638f6b411b/-shard-00002-of-00003-try-021aff4c448b3177-endshard.json. Repeated field must be imported as a JSON array. Field: rawFields.
我的方法有什么问题?看来我没有使用正确的方法来插入重复的记录。
我尝试使用以下代码重现该问题,但它执行成功。模式的其他方面是否存在问题?
List<TableFieldSchema> fields = new ArrayList<>();
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
fields.add(column);
TableSchema schema = new TableSchema().setFields(fields);
TableRow row = new TableRow();
List<TableRow> rawFields = new ArrayList<>();
rawFields.add(new TableRow().set("key","foo").set("value", "bar"));
row.set("rawFields", rawFields);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> c =
p.apply(Create.of(row, row).withCoder(TableRowJsonCoder.of()));
c.apply(BigQueryIO.Write.named("BigQuery-Write")
.to(options.getOutput())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withSchema(schema));
p.run();