Beam Sql 在 运行 Dataflow Runner 时失败
Beam Sql Failing While Running with Dataflow Runner
在测试 Beam Sql 时,我使用了来自 Github 的模型 class 示例,运行 本地计算机 (DirectRunner) 上的 POJo 示例工作正常但失败运行 使用 DataflowRunner 时出现异常。
异常:
java.lang.IllegalArgumentException: Unable to encode element 'com.test.Customer1@523377ea' with coder 'org.apache.beam.sdk.schemas.SchemaCoder@2574fe3c'.
at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
at com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:398)
at com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:124)
at com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:42)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:393)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:362)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:290)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at org.apache.beam.sdk.coders.BigEndianIntegerCoder.encode(BigEndianIntegerCoder.java:30)
at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:206)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$BXCW8AHn.encode(Unknown Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$BXCW8AHn.encode(Unknown Source)
at org.apache.beam.sdk.coders.RowCoder.encode(RowCoder.java:105)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:82)
at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
... 20 more
这实际上是 Beam 架构代码中的一个细微错误,只会在多工作程序设置中出现;这就是为什么在 DirectRunner 上测试通过的原因。修复在 https://github.com/apache/beam/pull/6218 中,一经审核就会合并。
在测试 Beam Sql 时,我使用了来自 Github 的模型 class 示例,运行 本地计算机 (DirectRunner) 上的 POJo 示例工作正常但失败运行 使用 DataflowRunner 时出现异常。
异常:
java.lang.IllegalArgumentException: Unable to encode element 'com.test.Customer1@523377ea' with coder 'org.apache.beam.sdk.schemas.SchemaCoder@2574fe3c'.
at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
at com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:398)
at com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:124)
at com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:42)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:393)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:362)
at com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:290)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at org.apache.beam.sdk.coders.BigEndianIntegerCoder.encode(BigEndianIntegerCoder.java:30)
at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:206)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$BXCW8AHn.encode(Unknown Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$BXCW8AHn.encode(Unknown Source)
at org.apache.beam.sdk.coders.RowCoder.encode(RowCoder.java:105)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:82)
at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
... 20 more
这实际上是 Beam 架构代码中的一个细微错误,只会在多工作程序设置中出现;这就是为什么在 DirectRunner 上测试通过的原因。修复在 https://github.com/apache/beam/pull/6218 中,一经审核就会合并。