java.io.IOException:INVALID_ARGUMENT:无法解析 com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write 处的密钥

java.io.IOException: INVALID_ARGUMENT: unable to parse key at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write

我在 运行 一些从 g3 读取的作业然后按键对数据进行分组时遇到以下异常。 读取过程中发生异常。

java.io.IOException: INVALID_ARGUMENT: unable to parse key at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method) at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.outputChunk(ShuffleSink.java:293) at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:288) at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:79) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

有什么想法吗?

当您尝试应用 GroupByKey 但某些映射键为空时,将抛出此异常。

此代码抛出异常:

pCollection
            .apply(ParDo.of(new DoFn<KV<MyObject, MyObject>, Object>() {
                @Override
                public void processElement(ProcessContext c) throws Exception {
                    c.output(KV.of(null, c.element()));
                }
            }))
            .apply(GroupByKey.<String, Statusable>create())

你不能写空键。 因此,当您的密钥可为空时,您必须执行以下操作:

pCollection
            .apply(ParDo.of(new DoFn<KV<MyObject, MyObject>, Object>() {
                @Override
                public void processElement(ProcessContext c) throws Exception {
                    String key == c.element().getKeyField();
                    if (key == null){
                        // Handle some how....
                        key = ... // not null value

                    }
                    c.output(KV.of(key, c.element()));
                }
            }))