数据流错误 Broken pipeline 错误和 503 service unavailable 错误

dataflow error Broken pipeline error and 503 service unavailable error

运行 几个小时后,我在数据流管道中收到以下错误,我认为这会导致丢失一些数据。我基本上失去了过去几个月的数据,但管道成功完成,它又花了 4 个小时才完成(这次是 12 小时)。

(c1e6e4a686086ce4): java.io.IOException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 服务在 com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:431) 在 com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:289) 在 com.google.cloud.dataflow.sdk.runners.worker.TextSink$TextFileWriter.close(TextSink.java:243) 在 com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) 在 com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:254) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:191) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:144)在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:180) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:161) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:148) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$ Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 原因:com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 服务在 com.google.api.client.googleapis.json.GoogleJsonResponseException.from( GoogleJsonResponseException.java:145) 在 com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) 在 com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) 在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java :432) 在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) 在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) 在 com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357) ... 还有 4 个


(327e81fa21383d97): java.io.IOException: java.io.IOException: 管道在 com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:431) 在 com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java :289) 在 com.google.cloud.dataflow.sdk.runners.worker.TextSink$TextFileWriter.close(TextSink.java:243) 在 com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) 在 com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:254) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:191) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:144) 在com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:180) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:161) 在 com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:148) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 原因:java.io.IOException:管道在 java.io.PipedInputStream.read(PipedInputStream.java:321) 在 java.io.PipedInputStream.read(PipedInputStream.java:377) 在 com.google.api.client.util.ByteStreams.read(ByteStreams.java:181) 在 com.google.api.client.googleapis.media.MediaHttpUploader.setContentAndHeadersOnCurrentRequest(MediaHttpUploader.java:629) 在com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:409) 在 com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) 在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427) 在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed (AbstractGoogleClientRequest.java:352) 在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) 在 com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357) ... 还有 4 个

这里的问题是存储中的一些旧记录具有旧的 schemaVersion。这些未被清理,因此在使用包含旧架构版本的范围时管道失败。