在 Scala 中编写 Apache Beam Filter
Writing Apache Beam Filter in Scala
我正在编写一个 Apache Beam 项目,该项目是 Java 使用 Scala 编写的,无法识别使用 Filter.by 功能所需的语法。这是我尝试过的示例,
class Test extends SerializableFunction[String, Boolean] {
def apply(m: String): Boolean = true
}
val pipeline = Pipeline.create();
pipeline.apply(
KafkaIO.read[String,String]()
.withBootstrapServers("localhost:9092")
.withTopic("test-topic")
.withKeyDeserializer(classOf[StringDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.withoutMetadata()
)
.apply(Values.create())
.apply(Filter.by((m: String) => true))
//And I've tried this
.apply(Filter.by(new Test()))
这给我以下错误,
[error] example.scala:61:19: overloaded method value by with alternatives:
[error] [T, PredicateT <: org.apache.beam.sdk.transforms.SerializableFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T] <and>
[error] [T, PredicateT <: org.apache.beam.sdk.transforms.ProcessFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T]
[error] cannot be applied to (com.example.Test)
[error] .apply(Filter.by(new Test()))
[error] ^
[error] one error found
Filter.by 的文档在此处 https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/Filter.html#by-PredicateT-
首先,您可能对 scio 感兴趣,它与 Scala 一起使用更清晰。
否则,我通过在 lambda 上明确指定 ProcessFunction
类型(使用 Beam 2.16.0 测试)成功地使用 Java SDK 创建了一个 Filter.by
:
// Using test pipeline outside of a JUnit @Rule
val pipeline = TestPipeline.create
pipeline.enableAbandonedNodeEnforcement(false)
// Applying a filter.
val predicate: ProcessFunction[String, java.lang.Boolean] = m => m.length == 3
val output = pipeline.apply(Create.of("one", "two", "three"))
.apply(Filter.by(predicate))
PAssert.that(output).containsInAnyOrder("one", "two")
// Run the test.
pipeline.run();
(注意 java.lang.Boolean
, 不是 scala.Boolean
return 类型。)
我正在编写一个 Apache Beam 项目,该项目是 Java 使用 Scala 编写的,无法识别使用 Filter.by 功能所需的语法。这是我尝试过的示例,
class Test extends SerializableFunction[String, Boolean] {
def apply(m: String): Boolean = true
}
val pipeline = Pipeline.create();
pipeline.apply(
KafkaIO.read[String,String]()
.withBootstrapServers("localhost:9092")
.withTopic("test-topic")
.withKeyDeserializer(classOf[StringDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.withoutMetadata()
)
.apply(Values.create())
.apply(Filter.by((m: String) => true))
//And I've tried this
.apply(Filter.by(new Test()))
这给我以下错误,
[error] example.scala:61:19: overloaded method value by with alternatives:
[error] [T, PredicateT <: org.apache.beam.sdk.transforms.SerializableFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T] <and>
[error] [T, PredicateT <: org.apache.beam.sdk.transforms.ProcessFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T]
[error] cannot be applied to (com.example.Test)
[error] .apply(Filter.by(new Test()))
[error] ^
[error] one error found
Filter.by 的文档在此处 https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/Filter.html#by-PredicateT-
首先,您可能对 scio 感兴趣,它与 Scala 一起使用更清晰。
否则,我通过在 lambda 上明确指定 ProcessFunction
类型(使用 Beam 2.16.0 测试)成功地使用 Java SDK 创建了一个 Filter.by
:
// Using test pipeline outside of a JUnit @Rule
val pipeline = TestPipeline.create
pipeline.enableAbandonedNodeEnforcement(false)
// Applying a filter.
val predicate: ProcessFunction[String, java.lang.Boolean] = m => m.length == 3
val output = pipeline.apply(Create.of("one", "two", "three"))
.apply(Filter.by(predicate))
PAssert.that(output).containsInAnyOrder("one", "two")
// Run the test.
pipeline.run();
(注意 java.lang.Boolean
, 不是 scala.Boolean
return 类型。)