java.io.IOException:INVALID_ARGUMENT:无法解析 com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write 处的密钥
java.io.IOException: INVALID_ARGUMENT: unable to parse key at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write
我在 运行 一些从 g3 读取的作业然后按键对数据进行分组时遇到以下异常。
读取过程中发生异常。
java.io.IOException: INVALID_ARGUMENT: unable to parse key at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method) at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.outputChunk(ShuffleSink.java:293) at
com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:288) at
com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at
com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:79) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745)
有什么想法吗?
当您尝试应用 GroupByKey
但某些映射键为空时,将抛出此异常。
此代码抛出异常:
pCollection
.apply(ParDo.of(new DoFn<KV<MyObject, MyObject>, Object>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(null, c.element()));
}
}))
.apply(GroupByKey.<String, Statusable>create())
你不能写空键。
因此,当您的密钥可为空时,您必须执行以下操作:
pCollection
.apply(ParDo.of(new DoFn<KV<MyObject, MyObject>, Object>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key == c.element().getKeyField();
if (key == null){
// Handle some how....
key = ... // not null value
}
c.output(KV.of(key, c.element()));
}
}))
我在 运行 一些从 g3 读取的作业然后按键对数据进行分组时遇到以下异常。 读取过程中发生异常。
java.io.IOException: INVALID_ARGUMENT: unable to parse key at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method) at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.outputChunk(ShuffleSink.java:293) at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:288) at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:79) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
有什么想法吗?
当您尝试应用 GroupByKey
但某些映射键为空时,将抛出此异常。
此代码抛出异常:
pCollection
.apply(ParDo.of(new DoFn<KV<MyObject, MyObject>, Object>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(null, c.element()));
}
}))
.apply(GroupByKey.<String, Statusable>create())
你不能写空键。 因此,当您的密钥可为空时,您必须执行以下操作:
pCollection
.apply(ParDo.of(new DoFn<KV<MyObject, MyObject>, Object>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key == c.element().getKeyField();
if (key == null){
// Handle some how....
key = ... // not null value
}
c.output(KV.of(key, c.element()));
}
}))