来自多个 Kafka 主题的流式查询中的 foreachBatches 包含什么?

What do foreachBatches contain in a streaming query from multiple Kafka topics?

给定一个 DataStreamReader 配置为订阅多个这样的主题(参见 here):

// Subscribe to multiple topics
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2,topic3")

当我在此基础上使用 foreachBatch 时,批次将包含什么?

在我的用例中,我希望批量处理仅来自一个主题的消息。可以配置吗?

该批次将包含来自您的消费者订阅的所有主题(我会说是分区)的消息。

引用Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)中的官方文档:

// Subscribe to multiple topics

...
.option("subscribe", "topic1,topic2")

上面的代码是(流式查询的)底层 Kafka 消费者订阅的内容。

When I use foreachBatch on top of this, what will the batches contain?

  • Each batch will only contain messages from one topic?

正确答案。

I'd like to have batches with messages coming from one topic only. Is it possible to configure this?

这也在 Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) 中记录:

Each row in the source has the following schema:

...

topic

换句话说,输入数据集将有 topic 列,其中包含给定行(记录)来自的主题名称。

为了 "batches with messages coming from one topic only" 你只需 filterwhere 一个主题,例如

val messages: DataFrame = ...
assert(messages.isStreaming)

messages
  .writeStream
  .foreachBatch { case (df, batchId) =>
    val topic1Only = df.where($"topic" === "topic1")
    val topic2Only = df.where($"topic" === "topic2")
    ...
  }