Apache Beam 不支持 Kotlin Iterable?
Kotlin Iterable not supported in Apache Beam?
Apache beam 似乎拒绝识别 Kotlin 的 Iterable
。这是一个示例代码:
@ProcessElement
fun processElement(
@Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
我收到以下奇怪的错误:
java.lang.IllegalArgumentException:
...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
@Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>
果然,如果我将 Iterable
替换为 java.lang.Iterable
,同样的代码也能正常工作。我做错了什么?
依赖版本:
- kotlin-jvm:
1.3.21
- org.apache.beam:
2.11.0
这是一个包含完整代码和堆栈跟踪的要点:
更新:
经过反复试验,我发现 List<String>
会抛出类似的异常,但 MutableList<String>
确实有效:
class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
@ProcessElement
fun processElement(
@Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
}
所以,这让我想起 Kotlin 的不可变集合实际上只是接口,底层集合仍然是可变的。但是,尝试将 Iterable
替换为 MutableIterable
会继续引发错误。
更新 2:
我使用上面的 MutableList
部署了我的 Kotlin 数据流作业,但作业失败了:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.output(GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
我不得不切换回使用 java.lang.Iterable
。
我对 kotlin 不是很熟悉,但您似乎需要导入 import java.lang.Iterable
才能在您的代码中使用它。
这看起来像是 Beam Kotlin SDK 中的错误。 @ProcessElement
方法的反射分析无法正常工作。您可以使用 ProcessContext ctx
而不是使用 @Element
参数来解决此问题。
当我们从 groupbykey.create() 获取可迭代对象时,请问如何解决这个问题。我不能像你那样使用 javalang iterable
对于那些遇到这个问题并在这里找到解决方法的人,我目前继续在 kotlin 中编写管道的解决方法是创建一个 Java static class 和创建的函数,包含并处理您的 Iterable(s)。然后可以将结果(以不可迭代的格式)传递回 kotlin。
我 运行 也遇到了这个问题,当在 GroupByKey
之后使用 ParDo
时。事实证明,在编写接受 GroupByKey
.
结果的 t运行sformation 时,Iterable
泛型类型中需要一个 @JvmWildcard
注释
请参阅下面的人为示例,该示例读取文件并按每行的第一个字符分组。
class BeamPipe {
class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
@ProcessElement
fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
receiver.output(KV.of(input.key, input.value.joinToString("\n")))
}
}
fun pipe(options: PipelineOptions) {
val file =
"testFile.txt"
val p = Pipeline.create(options)
p.apply(TextIO.read().from(file))
.apply("Key lines by first character",
WithKeys.of { line: String -> line[0].toString() }
.withKeyType(TypeDescriptors.strings()))
.apply("Group lines by first character", GroupByKey.create<String, String>())
.apply("Concatenate lines", ParDo.of(ConcatLines()))
.apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
.by { it.key }
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
.to("whatever")
.withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
)
p.run()
}
}
Apache beam 似乎拒绝识别 Kotlin 的 Iterable
。这是一个示例代码:
@ProcessElement
fun processElement(
@Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
我收到以下奇怪的错误:
java.lang.IllegalArgumentException:
...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
@Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>
果然,如果我将 Iterable
替换为 java.lang.Iterable
,同样的代码也能正常工作。我做错了什么?
依赖版本:
- kotlin-jvm:
1.3.21
- org.apache.beam:
2.11.0
这是一个包含完整代码和堆栈跟踪的要点:
更新:
经过反复试验,我发现 List<String>
会抛出类似的异常,但 MutableList<String>
确实有效:
class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
@ProcessElement
fun processElement(
@Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
}
所以,这让我想起 Kotlin 的不可变集合实际上只是接口,底层集合仍然是可变的。但是,尝试将 Iterable
替换为 MutableIterable
会继续引发错误。
更新 2:
我使用上面的 MutableList
部署了我的 Kotlin 数据流作业,但作业失败了:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.output(GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
我不得不切换回使用 java.lang.Iterable
。
我对 kotlin 不是很熟悉,但您似乎需要导入 import java.lang.Iterable
才能在您的代码中使用它。
这看起来像是 Beam Kotlin SDK 中的错误。 @ProcessElement
方法的反射分析无法正常工作。您可以使用 ProcessContext ctx
而不是使用 @Element
参数来解决此问题。
当我们从 groupbykey.create() 获取可迭代对象时,请问如何解决这个问题。我不能像你那样使用 javalang iterable
对于那些遇到这个问题并在这里找到解决方法的人,我目前继续在 kotlin 中编写管道的解决方法是创建一个 Java static class 和创建的函数,包含并处理您的 Iterable(s)。然后可以将结果(以不可迭代的格式)传递回 kotlin。
我 运行 也遇到了这个问题,当在 GroupByKey
之后使用 ParDo
时。事实证明,在编写接受 GroupByKey
.
Iterable
泛型类型中需要一个 @JvmWildcard
注释
请参阅下面的人为示例,该示例读取文件并按每行的第一个字符分组。
class BeamPipe {
class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
@ProcessElement
fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
receiver.output(KV.of(input.key, input.value.joinToString("\n")))
}
}
fun pipe(options: PipelineOptions) {
val file =
"testFile.txt"
val p = Pipeline.create(options)
p.apply(TextIO.read().from(file))
.apply("Key lines by first character",
WithKeys.of { line: String -> line[0].toString() }
.withKeyType(TypeDescriptors.strings()))
.apply("Group lines by first character", GroupByKey.create<String, String>())
.apply("Concatenate lines", ParDo.of(ConcatLines()))
.apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
.by { it.key }
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
.to("whatever")
.withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
)
p.run()
}
}