为什么我的数据流在插入到 BigQuery 时输出 "timeout value is negative"?
Why does my Dataflow output "timeout value is negative" on insertion to BigQuery?
我有一个数据流作业,包括 ReadSource、ParDo、Windowing、插入(到 BigQuery 中按日期分区的 table)。
基本上是:
- 使用 glob
从 Google 存储桶中读取文本文件
- 通过分隔分隔符来处理每一行,在为每一列赋予名称和数据类型之前更改一些值,然后输出为 BigQuery table 行以及基于数据的时间戳
- Window 每天 window 使用步骤 2
中的时间戳
- 写入 BigQuery,使用 Window table 和 "dataset$datepartition" 语法指定 table 和分区。创建处置设置为 CREATE_IF_NEEDED 并将处置设置为 WRITE_APPEND.
前三个步骤似乎 运行 没问题,但在大多数情况下,作业 运行 在最后一个插入步骤出现问题,在日志中给出异常:
java.lang.IllegalArgumentException: timeout value is negative at java.lang.Thread.sleep(Native Method)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.insertAll(BigQueryTableInserter.java:287)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.flushRows(BigQueryIO.java:2446)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.finishBundle(BigQueryIO.java:2404)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:65)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:80)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
此异常重复十次。
最后我得到 "workflow failed" 如下:
Workflow failed. Causes: S04:Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
GroupByKey/Read+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey/
GroupByWindow+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
ExpandIterable+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/ParDo(StreamingWrite)
failed.
有时使用相同输入的相同作业可以毫无问题地工作,但这使得调试变得非常困难。那么从哪里开始呢?
这是 known issue Dataflow SDK 中用于 Java 1.7.0 的 BigQueryIO 流式写入操作。它已在 GitHub HEAD 中修复,修复将包含在 Dataflow Java SDK 的 1.8.0 版本中。
有关详细信息,请参阅 Issue #451 on the DataflowJavaSDK GitHub repository。
我有一个数据流作业,包括 ReadSource、ParDo、Windowing、插入(到 BigQuery 中按日期分区的 table)。
基本上是:
- 使用 glob 从 Google 存储桶中读取文本文件
- 通过分隔分隔符来处理每一行,在为每一列赋予名称和数据类型之前更改一些值,然后输出为 BigQuery table 行以及基于数据的时间戳
- Window 每天 window 使用步骤 2 中的时间戳
- 写入 BigQuery,使用 Window table 和 "dataset$datepartition" 语法指定 table 和分区。创建处置设置为 CREATE_IF_NEEDED 并将处置设置为 WRITE_APPEND.
前三个步骤似乎 运行 没问题,但在大多数情况下,作业 运行 在最后一个插入步骤出现问题,在日志中给出异常:
java.lang.IllegalArgumentException: timeout value is negative at java.lang.Thread.sleep(Native Method)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.insertAll(BigQueryTableInserter.java:287)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.flushRows(BigQueryIO.java:2446)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.finishBundle(BigQueryIO.java:2404)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:65)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:80)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
此异常重复十次。
最后我得到 "workflow failed" 如下:
Workflow failed. Causes: S04:Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
GroupByKey/Read+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey/
GroupByWindow+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
ExpandIterable+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/ParDo(StreamingWrite)
failed.
有时使用相同输入的相同作业可以毫无问题地工作,但这使得调试变得非常困难。那么从哪里开始呢?
这是 known issue Dataflow SDK 中用于 Java 1.7.0 的 BigQueryIO 流式写入操作。它已在 GitHub HEAD 中修复,修复将包含在 Dataflow Java SDK 的 1.8.0 版本中。
有关详细信息,请参阅 Issue #451 on the DataflowJavaSDK GitHub repository。