KSQL join WHERE 子句的 java/scala kafka 流等效项是什么?

What is java/scala kafka streams equivalent for KSQL join WHERE clause?

假设我有 2 个 kafka 流(kafka-streams-scala 库,版本 2.2.0):

val builder: StreamsBuilder = new StreamsBuilder
val stream1: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic1")
val stream2: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic2")

他们的加入:

val stream3: KStream[String, MyClass] = flights.join(schedules)((r1, r2) =>  MyClass(r1.get("f1"), r2.get("f2")), JoinWindows.of(Duration.ofSeconds(30))

KSQL 中 WHERE 子句的等价物是什么? (参见 late_orders 流)流 API? 只使用 stream3.filter 是个好主意吗?这种方式能和KSQL创建流一样高效吗?

What is the equivalent of WHERE clause available in KSQL? (see late_orders stream) for streams API?

是:

  • KStream#filter(),其中 returns 已过滤 KStream
  • KTable#filter(),其中 returns 已过滤 KTable

https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#stateless-transformations

Is it a good idea to just use stream3.filter?

是的。

Will this approach have the same efficiency as stream created by KSQL?

是的。