BigQuery 手动加载,而不是通过 Java SDK
BigQuery loads manually but not through the Java SDK
我有一个 Dataflow 管道,运行在本地。 objective 是使用 TEXTIO 读取 JSON 文件,创建会话并将其加载到 BigQuery 中。鉴于结构,我必须在 GCS 中创建一个临时目录,然后使用该目录将其加载到 BigQuery 中。之前我有一个数据模式错误,阻止我加载数据,请参阅 。该问题已解决。
所以现在当我在本地 运行 管道时,它以将 JSON 换行符分隔的临时文件转储到 GCS 结束。 SDK 然后给我以下内容:
Starting BigQuery load job beam_job_xxxx_00001-1: try 1/3
INFO [main] (BigQueryIO.java:2191) - BigQuery load job failed: beam_job_xxxx_00001-1
...
Exception in thread "main" com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:187)
at pedesys.Dataflow.main(Dataflow.java:148)
Caused by: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.load(BigQueryIO.java:2198)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.processElement(BigQueryIO.java:2146)
错误的描述性不强,数据仍未加载到 BigQuery 中。令人费解的是,如果我转到 BigQuery UI 并从 GCS 加载由 SDK 的数据流管道手动转储的相同临时文件,在相同的 table 中,它会很好地工作。
相关代码部分如下:
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BigQueryOptions.class)
.setTempLocation("gs://test/temp");
Pipeline p = Pipeline.create(options)
...
...
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write
.named("loadJob")
.to("myproject:db.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);
我有一个 Dataflow 管道,运行在本地。 objective 是使用 TEXTIO 读取 JSON 文件,创建会话并将其加载到 BigQuery 中。鉴于结构,我必须在 GCS 中创建一个临时目录,然后使用该目录将其加载到 BigQuery 中。之前我有一个数据模式错误,阻止我加载数据,请参阅
所以现在当我在本地 运行 管道时,它以将 JSON 换行符分隔的临时文件转储到 GCS 结束。 SDK 然后给我以下内容:
Starting BigQuery load job beam_job_xxxx_00001-1: try 1/3
INFO [main] (BigQueryIO.java:2191) - BigQuery load job failed: beam_job_xxxx_00001-1
...
Exception in thread "main" com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:187)
at pedesys.Dataflow.main(Dataflow.java:148)
Caused by: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.load(BigQueryIO.java:2198)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.processElement(BigQueryIO.java:2146)
错误的描述性不强,数据仍未加载到 BigQuery 中。令人费解的是,如果我转到 BigQuery UI 并从 GCS 加载由 SDK 的数据流管道手动转储的相同临时文件,在相同的 table 中,它会很好地工作。
相关代码部分如下:
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BigQueryOptions.class)
.setTempLocation("gs://test/temp");
Pipeline p = Pipeline.create(options)
...
...
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write
.named("loadJob")
.to("myproject:db.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);