Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:JSON 为非记录字段指定的对象

BigQuery in Dataflow fails to load data from Cloud Storage: JSON object specified for non-record field

我的机器上本地有一个 Dataflow 管道 运行正在写入 BigQuery。此批处理作业中的 BigQuery 需要一个临时位置。我在我的云存储中提供了一个。相关部分是:

PipelineOptions options = PipelineOptionsFactory.create();
    options.as(BigQueryOptions.class)
            .setTempLocation("gs://folder/temp");
    Pipeline p = Pipeline.create(options);

....

List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("uuid").setType("STRING"));
      fields.add(new TableFieldSchema().setName("start_time").setType("TIMESTAMP"));
      fields.add(new TableFieldSchema().setName("end_time").setType("TIMESTAMP"));
      TableSchema schema = new TableSchema().setFields(fields);

session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
      .apply(BigQueryIO.Write
      .withSchema(schema)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      .to("myproject:db.table"));

哪里 FormatAsTableRowFn 我有:

static class FormatAsTableRowFn extends DoFn<KV<String, String>, TableRow>
implements RequiresWindowAccess{  
    @Override
        public void processElement(ProcessContext c) {
          TableRow row = new TableRow()
              .set("uuid", c.element().getKey())
              // include a field for the window timestamp
             .set("start_time", ((IntervalWindow) c.window()).start().toInstant()) //NOTE: I tried both with and without 
             .set("end_time", ((IntervalWindow) c.window()).end().toInstant());   // .toInstant receiving the same error
          c.output(row);
        }
      }

如果我打印出来 row.toString() 我会得到合法的时间戳:

{uuid=00:00:00:00:00:00, start_time=2016-09-22T07:34:38.000Z, end_time=2016-09-22T07:39:38.000Z}

当我 运行 这个代码 JAVA 说:Failed to create the load job beam_job_XXX

手动检查 GCS 中的 temp 文件夹,对象如下所示:

{"mac":"00:00:00:00:00:00","start_time":{"millis":1474529678000,"chronology":{"zone":{"fixed":true,"id":"UTC"}},"zone":{"fixed":true,"id":"UTC"},"afterNow":false,"beforeNow":true,"equalNow":false},"end_time":{"millis":1474529978000,"chronology":{"zone":{"fixed":true,"id":"UTC"}},"zone":{"fixed":true,"id":"UTC"},"afterNow":false,"beforeNow":true,"equalNow":false}}

查看 BigQuery 中的失败作业报告,错误显示:

JSON object specified for non-record field: start_time (error code: invalid)

这很奇怪,因为我很确定我说过这是一个 TIMESTAMP,而且我 100% 确定我在 BigQuery 中的架构符合 SDK 中的 TableSchema。 (注意:设置 withCreateDisposition...CREATE_IF_NEEDED 会产生相同的结果)

有人可以告诉我需要如何补救才能在 BigQuery 中获取数据吗?

不要使用 Instant 对象。尝试使用 milliseconds/seconds.

https://cloud.google.com/bigquery/data-types

A positive number specifies the number of seconds since the epoch

所以,像这样的东西应该可以工作:

.getMillis() / 1000