当我的编码器需要一个需要 UnionCoder 的 KvCoder 时,我如何在进行端到端测试时设置编码器

How do I setCoders when doing an end-to-end test when my coder requires a KvCoder which requires a UnionCoder

我正在使用 Java 云数据流 SDK,并且正在进行一些端到端测试。

@Test
    public void testEndtoEnd() throws Exception {
        TupleTag<Entity> tag1 = aTagFromElsewhere1;
        TupleTag<Entity> tag2 = aTagFromElsewhere2;
        TupleTagList tags = TupleTagList.of(tag1).and(tag2);
        CoGbkResultSchema schema = new CoGbkResultSchema(tags);

        JoinEntities myDoFn = new JoinEntities();
        DoFnTester<KV<String, CoGbkResult>, Entity> fnTester = DoFnTester.of(myDoFn);     
        List<RawUnionValue> rawUnionValues = new ArrayList<RawUnionValue>();
        Date validThruDate = new Date(System.currentTimeMillis() + 5000L);
        rawUnionValues.add(new RawUnionValue(0, aValidEntity1)));
        rawUnionValues.add(new RawUnionValue(1, aValidEntity2));
        CoGbkResult result = new CoGbkResult(schema, rawUnionValues);

        KV<String, CoGbkResult> aCoGbkPair = KV.of("Bleh", result);

        Pipeline p = TestPipeline.create();

        PCollection<KV<String, CoGbkResult>> input = p.apply(Create.of(aCoGbkPair))
                .setCoder(KvCoder.of(StringUtf8Coder.of(), CoGbkResultCoder.of(UnionCoder, schema)));

        PCollection<String> output = input.apply(new FormatEntitiesForTsv());

        DataflowAssert.that(output).containsInAnyOrder(/**TODO: Create test data**/);
    }

我遇到的问题是在 setCoder 中,我正在使用需要 UnionCoder 的 KvCoder.of()。我不确定如何获得这个 UnionCoder,我已经查看了它的 class,但它无法访问。

我该如何解决这个问题? (或者,如果有更好的方法来获取输入,我会洗耳恭听)。

感谢和欢呼:)

的确,这是 SDK 中的一个疏忽 - UnionCoder 应该是 public,而且它是前段时间在 Beam SDK 中制作的 public。您最好的选择是使用此更改构建您自己的 Dataflow SDK 版本,或者等待我们在 github 存储库中进行更改并等待下一个 Maven 版本(我将发送拉取请求并更新此答案)。