在 Scala 中编写 Apache Beam Filter

Writing Apache Beam Filter in Scala

我正在编写一个 Apache Beam 项目,该项目是 Java 使用 Scala 编写的,无法识别使用 Filter.by 功能所需的语法。这是我尝试过的示例,

    class Test extends SerializableFunction[String, Boolean] {
      def apply(m: String): Boolean = true
    }

    val pipeline = Pipeline.create();
    pipeline.apply(
      KafkaIO.read[String,String]()
        .withBootstrapServers("localhost:9092")
        .withTopic("test-topic")
        .withKeyDeserializer(classOf[StringDeserializer])
        .withValueDeserializer(classOf[StringDeserializer])
        .withoutMetadata()
    )
    .apply(Values.create())
    .apply(Filter.by((m: String) => true))
//And I've tried this
    .apply(Filter.by(new Test()))

这给我以下错误,

[error] example.scala:61:19: overloaded method value by with alternatives:
[error]   [T, PredicateT <: org.apache.beam.sdk.transforms.SerializableFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T] <and>
[error]   [T, PredicateT <: org.apache.beam.sdk.transforms.ProcessFunction[T,Boolean]](predicate: PredicateT)org.apache.beam.sdk.transforms.Filter[T]
[error]  cannot be applied to (com.example.Test)
[error]     .apply(Filter.by(new Test()))
[error]                   ^
[error] one error found

Filter.by 的文档在此处 https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/Filter.html#by-PredicateT-

首先,您可能对 scio 感兴趣,它与 Scala 一起使用更清晰。

否则,我通过在 lambda 上明确指定 ProcessFunction 类型(使用 Beam 2.16.0 测试)成功地使用 Java SDK 创建了一个 Filter.by

// Using test pipeline outside of a JUnit @Rule
val pipeline = TestPipeline.create
pipeline.enableAbandonedNodeEnforcement(false)

// Applying a filter.
val predicate: ProcessFunction[String, java.lang.Boolean] = m => m.length == 3
val output = pipeline.apply(Create.of("one", "two", "three"))
  .apply(Filter.by(predicate))
PAssert.that(output).containsInAnyOrder("one", "two")

// Run the test.
pipeline.run();

(注意 java.lang.Boolean 不是 scala.Boolean return 类型。)