如何从 Spark Structured Streaming 获取 Kafka 输出中的批次 ID

How to get batch ID in Kafka output from Spark Structured Streaming

我 运行 我的 Spark Structured Streaming 作业处于更新模式,无法弄清楚是否可以获取每个更新的批次 ID。例如,当您以更新模式输出到控制台时,Spark 将在输出时显示每个批号:

-------------------------------------------
Batch: 0
-------------------------------------------
...
-------------------------------------------
Batch: 1
-------------------------------------------
...

等等。 我需要将同样的信息添加到我发送给 Kafka 的每条消息中。为此我只能使用 Spark 2.3,所以我不能使用 forEachBatch。

我的工作输出一组特定维度的聚合指标。每个触发器,指标可能自上次触发器以来已更新 - 具有更新指标的维度将在下一批中输出,因为我 运行 处于更新模式。当我将这些输出到 Kafka 时,我需要知道哪个批次是最新的——因此需要批次号。我认为 forEachBatch 可以满足我的需要,但不幸的是我无法访问 Spark 2.4。我可以使用 forEach 来完成这个吗?我仅限于使用更新模式,因为延迟事件可能会进入并更新之前已经输出的指标。

这是我用来测试的控制台模式。此输出分别显示每个批次及其编号:

StreamingQuery query = logs.writeStream()
        .format("console")
        .outputMode(OutputMode.Update())
        .start();

我想做这样的事情

StreamingQuery query = agg.WriteStream()
    .format("kafka")
    .outputMode(OutputMode.Update())
    .option("kafka.bootstrap.servers", "myconnection")
    .Option("topic", "mytopic")
    .Start();

但仍然保留在 mytopic 中判断消息来自哪个批次的能力。这可能吗?

我认为您可以使用 ForeachWriter

中的版本号 long version

您可以像这样实现自己的 KafkaCustomSink。


class KafkaCustomSink(val config: Config) extends ForeachWriter[String] {
  var producer: KafkaProducer[String, String] = _
  var _version: Long = _

  override def open(partitionId: Long, version: Long): Boolean = {
    _version = version
    val props = new Properties()
    props.put("bootstrap.servers", config(Constant.OUTPUT_BOOTSTRAP_SERVER))
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("acks", "0")
    producer = new KafkaProducer[String, String](props)
    true
  }

  override def process(value: String): Unit = {
    //use version here
    val record = new ProducerRecord[String, String](config(Constant.OUTPUT_TOPIC), null, "version : %s, data : %s".format(_version, value))
    producer.send(record)
  }

  override def close(errorOrNull: Throwable): Unit = {
    producer.close()
  }
}

并将其分配给

      logs
          .writeStream
          .outputMode("update")
          .foreach(new KafkaCustomSink(config))
          .trigger(Trigger.ProcessingTime(config(Constant.TRIGGER_INTERVAL).toInt, TimeUnit.SECONDS))
          .option("checkpointLocation", config(Constant.CHECKPOINT_LOCATION))