CoderException:java.io.EOFException 在 Json 值上执行 GroupByKey 时使用 Jackson 使用 CustomCoder 编码

CoderException: java.io.EOFException when performing GroupByKey on Json values encoded with a CustomCoder using Jackson

为什么在执行以下代码时会出现此 EOFException?

我已经在更简单的情况下成功使用了 GroupByKey 我认为触发错误的似乎是使用自定义编码器(对于 Json 对象)。谁能解释为什么会这样?

这是错误:

com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException

    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:186)
    at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106)
    at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access[=12=]0(ParentRunner.java:58)
    at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: com.google.cloud.dataflow.sdk.coders.CoderException: java.io.EOFException
    at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:62)
    at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readLong(DataInputStream.java:416)
    at com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:58)
    at com.google.cloud.dataflow.sdk.coders.InstantCoder.decode(InstantCoder.java:83)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:553)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:98)
    at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:157)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:140)
    at com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:134)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:107)
    at com.google.cloud.dataflow.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1303)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449)
    at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38)
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access0(ParDo.java:457)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluate(ParDo.java:1084)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluate(ParDo.java:1079)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106)
    at com.example.dataflow.TestGroupByKeyCustomCoder.testPipeline(TestGroupByKeyCustomCoder.java:85)

代码如下:

package com.example.dataflow;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.coders.CustomCoder;
import com.google.cloud.dataflow.sdk.testing.CoderProperties;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;


class ParseJson extends DoFn<String, JsonNode> {

    private static final long serialVersionUID = 1L;
    private transient ObjectMapper om;

    { init(); }

    private void init() {
        om = new ObjectMapper();
    }

    private void readObject(java.io.ObjectInputStream in)
            throws IOException, ClassNotFoundException {
        init();
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
        JsonNode node = om.readTree(c.element());
        c.output(node);
    }
}

class JsonNodeCoder extends CustomCoder<JsonNode> {

    private static final long serialVersionUID = 1L;

    private ObjectMapper mapper = new ObjectMapper();

    private static final JsonNodeCoder INSTANCE = new JsonNodeCoder();

    public static JsonNodeCoder of() {
        return INSTANCE;
    }

    @Override
    public void encode(JsonNode value, OutputStream outStream, Context context) throws IOException {
        mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false).writeValue(outStream, value);
    }

    @Override
    public JsonNode decode(InputStream inStream, Context context) throws IOException {
        return mapper.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false).readTree(inStream);
    }
}

public class TestGroupByKeyCustomCoder {

    @Test // original code the produces the error
    public void testPipeline() throws IOException {

        TestPipeline p = TestPipeline.create();

        p.getCoderRegistry().registerCoder(JsonNode.class, JsonNodeCoder.class);

        p.apply(Create.of("{}"))
                .apply(ParDo.of(new ParseJson()))
                .apply(WithKeys.of("foo"))
                .apply("GroupByAction", GroupByKey.create());

        p.run();
    }

    // Test as per Kenn Knowles' suggestion
    // this throws the same error
    @Test
    public void testCustomCoder() throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode value = mapper.readTree("{}");

        WindowedValue.FullWindowedValueCoder<JsonNode> windowedValueCoder
                = WindowedValue.FullWindowedValueCoder
                    .of(JsonNodeCoder.of(), GlobalWindow.Coder.INSTANCE);

        WindowedValue<JsonNode> x = WindowedValue.of(
                value, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING);
        CoderProperties.coderDecodeEncodeEqual(windowedValueCoder, x);
    }
}

这个问题似乎是由于 readTree 消耗了太多输入,因此吞噬了 Dataflow 正在寻找的时间戳:

@Test
public void testJackson() throws IOException {
    ObjectMapper mapper = new ObjectMapper();
    ByteArrayInputStream bis = new ByteArrayInputStream("{}1".getBytes());
    mapper.readTree(bis);
    Assert.assertNotEquals(bis.read(), -1); // assertion fails
}

堆栈跟踪表明在分析时间戳的大端 long 时到达文件末尾。

encoding used by WindowedValue.FullWindowedValueCoder 是您的编码值,然后是时间戳,然后是 windows,最后是窗格元数据。所以这反过来意味着 JsonCoder 从输入流中消耗了太多字节(也许是所有字节?)所以时间戳的解码到达了文件的末尾。

SDK 提供了很多实用程序,用于使用编码器 WindowedValue.FullWindowedValueCoder.of(JsonCoder.of(), new GlobalWindow.Coder()) 测试 CoderProperties. You can actually directly test this case, which is in the global window, by running CoderProperties#coderDecodeEncodeEqual 中的编码器。

您可能必须注意传递给 encodedecode 的标志:Coder.Context.

  • Coder.Context.OUTER 表示您的编码器是最外层的 Coder 并拥有整个流。在这种情况下,在编码时,您可以利用 EOF 信号并省略长度前缀或括号等元数据,而在解码时,您可以随意使用。
  • Coder.Context.NESTED 表示您的 Coder 仅对值的一部分进行编码,因此它需要写入足够的元数据,以便它可以智能地仅使用自己编码的字节。