Beam Java SDK 2.4/2.5 PAssert with CoGroupByKey
Beam Java SDK 2.4/2.5 PAssert with CoGroupByKey
我可能遗漏了一些明显的东西,但出于某种原因我无法让 PAssert
& TestPipeline
与 CoGroupByKey
一起工作——但没有它,它工作正常。
这是一个参考测试文件,可以重现我面临的问题。
我测试了 beam sdk 2.4 和 2.5。
为了比较,testWorking
按预期工作,testBroken
有一个额外的步骤,如下所示:
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
我得到的错误可以在下面的代码之后找到。
public class ReferenceTest {
@Rule
public final transient TestPipeline pipe1 = TestPipeline.create();
@Rule
public final transient TestPipeline pipe2 = TestPipeline.create();
public static class String2KV extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
// "key1:value1" -> ["key1", "value1"]
String[] tokens = c.element().split(":");
c.output(KV.of(tokens[0], tokens[1]));
}
}
public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> {
final TupleTag<String> inTag1;
final TupleTag<String> inTag2;
final TupleTag<String> outTag2;
public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2, TupleTag<String> outTag2) {
this.inTag1 = inTag1;
this.inTag2 = inTag2;
this.outTag2 = outTag2;
}
@ProcessElement
public void processElement(ProcessContext c) {
String val1 = c.element().getValue().getOnly(inTag1);
String val2 = c.element().getValue().getOnly(inTag2);
// outTag1 = main output
// outTag2 = side output
c.output(outTag2, val1 + "," + val2);
}
}
@Test
public void testWorking() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe1.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
pipe1.run();
}
@Test
public void testBroken() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe2.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
TupleTag<String> inTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
// Without the following two PAsserts, the CoGBK step above seems to cause an issue.
PAssert.that(tuple.get(outTag1)).empty();
PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");
pipe2.run();
}
}
这是错误:
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline.visitPrimitiveTransform(Pipeline.java:278)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.apache.beam.sdk.testing.TestPipeline.evaluate(TestPipeline.java:324)
at org.apache.beam.sdk.testing.TestPipeline.evaluate(TestPipeline.java:324)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access[=13=]0(ParentRunner.java:58)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
at java.util.HashMap.writeObject(HashMap.java:1363)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 54 more
Process finished with exit code 255
有没有人遇到过类似的测试管道问题?
我还没有对它进行广泛的测试,但是我找不到 CoGroupByKey
& TestPipeline
的相关信息。
在生产中,相同的代码对我的团队来说工作得很好,我们想使用 TestPipeline
和 PAssert
添加一些单元测试。这就是我们解决这个问题的方式。
任何帮助将不胜感激!
异常的根本原因是 'java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest'。某些对象无意中持有对外部 class 'ReferenceTest' 的引用。这些很可能是为 TupleTag 定义的匿名 classes。请尝试使它们成为常规 classes。
我可能遗漏了一些明显的东西,但出于某种原因我无法让 PAssert
& TestPipeline
与 CoGroupByKey
一起工作——但没有它,它工作正常。
这是一个参考测试文件,可以重现我面临的问题。 我测试了 beam sdk 2.4 和 2.5。
为了比较,testWorking
按预期工作,testBroken
有一个额外的步骤,如下所示:
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
我得到的错误可以在下面的代码之后找到。
public class ReferenceTest {
@Rule
public final transient TestPipeline pipe1 = TestPipeline.create();
@Rule
public final transient TestPipeline pipe2 = TestPipeline.create();
public static class String2KV extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
// "key1:value1" -> ["key1", "value1"]
String[] tokens = c.element().split(":");
c.output(KV.of(tokens[0], tokens[1]));
}
}
public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> {
final TupleTag<String> inTag1;
final TupleTag<String> inTag2;
final TupleTag<String> outTag2;
public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2, TupleTag<String> outTag2) {
this.inTag1 = inTag1;
this.inTag2 = inTag2;
this.outTag2 = outTag2;
}
@ProcessElement
public void processElement(ProcessContext c) {
String val1 = c.element().getValue().getOnly(inTag1);
String val2 = c.element().getValue().getOnly(inTag2);
// outTag1 = main output
// outTag2 = side output
c.output(outTag2, val1 + "," + val2);
}
}
@Test
public void testWorking() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe1.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
pipe1.run();
}
@Test
public void testBroken() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe2.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
TupleTag<String> inTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
// Without the following two PAsserts, the CoGBK step above seems to cause an issue.
PAssert.that(tuple.get(outTag1)).empty();
PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");
pipe2.run();
}
}
这是错误:
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline.visitPrimitiveTransform(Pipeline.java:278)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.apache.beam.sdk.testing.TestPipeline.evaluate(TestPipeline.java:324)
at org.apache.beam.sdk.testing.TestPipeline.evaluate(TestPipeline.java:324)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access[=13=]0(ParentRunner.java:58)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
at java.util.HashMap.writeObject(HashMap.java:1363)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 54 more
Process finished with exit code 255
有没有人遇到过类似的测试管道问题?
我还没有对它进行广泛的测试,但是我找不到 CoGroupByKey
& TestPipeline
的相关信息。
在生产中,相同的代码对我的团队来说工作得很好,我们想使用 TestPipeline
和 PAssert
添加一些单元测试。这就是我们解决这个问题的方式。
任何帮助将不胜感激!
异常的根本原因是 'java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest'。某些对象无意中持有对外部 class 'ReferenceTest' 的引用。这些很可能是为 TupleTag 定义的匿名 classes。请尝试使它们成为常规 classes。