当我的编码器需要一个需要 UnionCoder 的 KvCoder 时,我如何在进行端到端测试时设置编码器
How do I setCoders when doing an end-to-end test when my coder requires a KvCoder which requires a UnionCoder
我正在使用 Java 云数据流 SDK,并且正在进行一些端到端测试。
@Test
public void testEndtoEnd() throws Exception {
TupleTag<Entity> tag1 = aTagFromElsewhere1;
TupleTag<Entity> tag2 = aTagFromElsewhere2;
TupleTagList tags = TupleTagList.of(tag1).and(tag2);
CoGbkResultSchema schema = new CoGbkResultSchema(tags);
JoinEntities myDoFn = new JoinEntities();
DoFnTester<KV<String, CoGbkResult>, Entity> fnTester = DoFnTester.of(myDoFn);
List<RawUnionValue> rawUnionValues = new ArrayList<RawUnionValue>();
Date validThruDate = new Date(System.currentTimeMillis() + 5000L);
rawUnionValues.add(new RawUnionValue(0, aValidEntity1)));
rawUnionValues.add(new RawUnionValue(1, aValidEntity2));
CoGbkResult result = new CoGbkResult(schema, rawUnionValues);
KV<String, CoGbkResult> aCoGbkPair = KV.of("Bleh", result);
Pipeline p = TestPipeline.create();
PCollection<KV<String, CoGbkResult>> input = p.apply(Create.of(aCoGbkPair))
.setCoder(KvCoder.of(StringUtf8Coder.of(), CoGbkResultCoder.of(UnionCoder, schema)));
PCollection<String> output = input.apply(new FormatEntitiesForTsv());
DataflowAssert.that(output).containsInAnyOrder(/**TODO: Create test data**/);
}
我遇到的问题是在 setCoder 中,我正在使用需要 UnionCoder 的 KvCoder.of()。我不确定如何获得这个 UnionCoder,我已经查看了它的 class,但它无法访问。
我该如何解决这个问题? (或者,如果有更好的方法来获取输入,我会洗耳恭听)。
感谢和欢呼:)
的确,这是 SDK 中的一个疏忽 - UnionCoder
应该是 public,而且它是前段时间在 Beam SDK 中制作的 public。您最好的选择是使用此更改构建您自己的 Dataflow SDK 版本,或者等待我们在 github 存储库中进行更改并等待下一个 Maven 版本(我将发送拉取请求并更新此答案)。
我正在使用 Java 云数据流 SDK,并且正在进行一些端到端测试。
@Test
public void testEndtoEnd() throws Exception {
TupleTag<Entity> tag1 = aTagFromElsewhere1;
TupleTag<Entity> tag2 = aTagFromElsewhere2;
TupleTagList tags = TupleTagList.of(tag1).and(tag2);
CoGbkResultSchema schema = new CoGbkResultSchema(tags);
JoinEntities myDoFn = new JoinEntities();
DoFnTester<KV<String, CoGbkResult>, Entity> fnTester = DoFnTester.of(myDoFn);
List<RawUnionValue> rawUnionValues = new ArrayList<RawUnionValue>();
Date validThruDate = new Date(System.currentTimeMillis() + 5000L);
rawUnionValues.add(new RawUnionValue(0, aValidEntity1)));
rawUnionValues.add(new RawUnionValue(1, aValidEntity2));
CoGbkResult result = new CoGbkResult(schema, rawUnionValues);
KV<String, CoGbkResult> aCoGbkPair = KV.of("Bleh", result);
Pipeline p = TestPipeline.create();
PCollection<KV<String, CoGbkResult>> input = p.apply(Create.of(aCoGbkPair))
.setCoder(KvCoder.of(StringUtf8Coder.of(), CoGbkResultCoder.of(UnionCoder, schema)));
PCollection<String> output = input.apply(new FormatEntitiesForTsv());
DataflowAssert.that(output).containsInAnyOrder(/**TODO: Create test data**/);
}
我遇到的问题是在 setCoder 中,我正在使用需要 UnionCoder 的 KvCoder.of()。我不确定如何获得这个 UnionCoder,我已经查看了它的 class,但它无法访问。
我该如何解决这个问题? (或者,如果有更好的方法来获取输入,我会洗耳恭听)。
感谢和欢呼:)
的确,这是 SDK 中的一个疏忽 - UnionCoder
应该是 public,而且它是前段时间在 Beam SDK 中制作的 public。您最好的选择是使用此更改构建您自己的 Dataflow SDK 版本,或者等待我们在 github 存储库中进行更改并等待下一个 Maven 版本(我将发送拉取请求并更新此答案)。