如何在 Apache Beam 中同时使用 MapElements 和 KV?
How do I use MapElements and KV in together in Apache Beam?
我想做类似的事情:
PCollection<String> a = whatever;
PCollection<KV<String, User>> b = a.apply(
MapElements.into(TypeDescriptor.of(KV<String, User>.class))
.via(s -> KV.of(s, new User(s))));
其中 User 是带有 Arvo 编码器和考虑字符串的构造函数的自定义数据类型。
但是,我收到以下错误:
Cannot select from parameterized type
我试图将其更改为 TypeDescriptor.of(KV.class)
,但后来我得到:
Incompatible types; Required PCollection> but 'apply' was inferred to OutputT: no instance(s) of type variable(s) exists so that PCollection conforms to PCollection>
那么我应该如何使用 KV
和 MapElements
?
我知道我想做的事情可以使用 ParDo
实现,我可以通过清除 new DoFn<String, KV<String, User>>
明确指定如何执行类型擦除,但 ParDo
不支持 lambda 函数。由于我们使用的是 Java 8,这似乎不太优雅....
由于 type erasure in Java 在编译期间,KV<String, User>.class
被转换为 KV.class
并且在运行时 KV.class
没有足够的信息来推断编码器,因为类型变量具有已被删除。
要绕过此限制,您需要使用一种在编译后保留类型信息的机制。例如你可以使用:
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(User.class))
这与提供您自己的匿名相同class:
new TypeDescriptor<KV<String, User>> {}
提供带有类型变量绑定的匿名 classes 是目前在 Java 中绕过类型擦除的方法之一。
尝试使用 SimpleFunction - 它保留类型信息
我想做类似的事情:
PCollection<String> a = whatever;
PCollection<KV<String, User>> b = a.apply(
MapElements.into(TypeDescriptor.of(KV<String, User>.class))
.via(s -> KV.of(s, new User(s))));
其中 User 是带有 Arvo 编码器和考虑字符串的构造函数的自定义数据类型。
但是,我收到以下错误:
Cannot select from parameterized type
我试图将其更改为 TypeDescriptor.of(KV.class)
,但后来我得到:
Incompatible types; Required PCollection> but 'apply' was inferred to OutputT: no instance(s) of type variable(s) exists so that PCollection conforms to PCollection>
那么我应该如何使用 KV
和 MapElements
?
我知道我想做的事情可以使用 ParDo
实现,我可以通过清除 new DoFn<String, KV<String, User>>
明确指定如何执行类型擦除,但 ParDo
不支持 lambda 函数。由于我们使用的是 Java 8,这似乎不太优雅....
由于 type erasure in Java 在编译期间,KV<String, User>.class
被转换为 KV.class
并且在运行时 KV.class
没有足够的信息来推断编码器,因为类型变量具有已被删除。
要绕过此限制,您需要使用一种在编译后保留类型信息的机制。例如你可以使用:
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(User.class))
这与提供您自己的匿名相同class:
new TypeDescriptor<KV<String, User>> {}
提供带有类型变量绑定的匿名 classes 是目前在 Java 中绕过类型擦除的方法之一。
尝试使用 SimpleFunction - 它保留类型信息