如何在 Apache Beam 中同时使用 MapElements 和 KV?

How do I use MapElements and KV in together in Apache Beam?

我想做类似的事情:

PCollection<String> a = whatever;
PCollection<KV<String, User>> b = a.apply(
        MapElements.into(TypeDescriptor.of(KV<String, User>.class))
        .via(s -> KV.of(s, new User(s))));

其中 User 是带有 Arvo 编码器和考虑字符串的构造函数的自定义数据类型。

但是,我收到以下错误:

Cannot select from parameterized type

我试图将其更改为 TypeDescriptor.of(KV.class),但后来我得到:

Incompatible types; Required PCollection> but 'apply' was inferred to OutputT: no instance(s) of type variable(s) exists so that PCollection conforms to PCollection>

那么我应该如何使用 KVMapElements

我知道我想做的事情可以使用 ParDo 实现,我可以通过清除 new DoFn<String, KV<String, User>> 明确指定如何执行类型擦除,但 ParDo 不支持 lambda 函数。由于我们使用的是 Java 8,这似乎不太优雅....

由于 type erasure in Java 在编译期间,KV<String, User>.class 被转换为 KV.class 并且在运行时 KV.class 没有足够的信息来推断编码器,因为类型变量具有已被删除。

要绕过此限制,您需要使用一种在编译后保留类型信息的机制。例如你可以使用:

TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(User.class))

这与提供您自己的匿名相同class:

new TypeDescriptor<KV<String, User>> {}

提供带有类型变量绑定的匿名 classes 是目前在 Java 中绕过类型擦除的方法之一。

尝试使用 SimpleFunction - 它保留类型信息