为什么我的数据流在插入到 BigQuery 时输出 "timeout value is negative"?

Why does my Dataflow output "timeout value is negative" on insertion to BigQuery?

我有一个数据流作业,包括 ReadSource、ParDo、Windowing、插入(到 BigQuery 中按日期分区的 table)。

基本上是:

  1. 使用 glob
  2. 从 Google 存储桶中读取文本文件
  3. 通过分隔分隔符来处理每一行,在为每一列赋予名称和数据类型之前更改一些值,然后输出为 BigQuery table 行以及基于数据的时间戳
  4. Window 每天 window 使用步骤 2
  5. 中的时间戳
  6. 写入 BigQuery,使用 Window table 和 "dataset$datepartition" 语法指定 table 和分区。创建处置设置为 CREATE_IF_NEEDED 并将处置设置为 WRITE_APPEND.

前三个步骤似乎 运行 没问题,但在大多数情况下,作业 运行 在最后一个插入步骤出现问题,在日志中给出异常:

java.lang.IllegalArgumentException: timeout value is negative at java.lang.Thread.sleep(Native Method) 
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.insertAll(BigQueryTableInserter.java:287) 
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.flushRows(BigQueryIO.java:2446) 
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.finishBundle(BigQueryIO.java:2404) 
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158) 
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196) 
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47) 
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:65) 
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:80) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

此异常重复十次。

最后我得到 "workflow failed" 如下:

Workflow failed. Causes: S04:Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/ 
GroupByKey/Read+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey/
GroupByWindow+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
ExpandIterable+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/ParDo(StreamingWrite)
 failed.

有时使用相同输入的相同作业可以毫无问题地工作,但这使得调试变得非常困难。那么从哪里开始呢?

这是 known issue Dataflow SDK 中用于 Java 1.7.0 的 BigQueryIO 流式写入操作。它已在 GitHub HEAD 中修复,修复将包含在 Dataflow Java SDK 的 1.8.0 版本中。

有关详细信息,请参阅 Issue #451 on the DataflowJavaSDK GitHub repository