Spark Structured Streaming 无法查看记录详细信息

Spark Structured Streaming not able to see the record details

我正在尝试处理来自读取流的记录并尝试打印该行。 如何在我的驱动程序日志或执行程序日志中看不到任何打印的语句。 可能有什么问题?

  1. 对于每条记录或批次(理想情况下),我想打印消息
  2. 对于每个批次,我想执行一个过程。
val kafka = spark.readStream
    .format("kafka")
    .option("maxOffsetsPerTrigger", MAX_OFFSETS_PER_TRIGGER)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) 
    .option("subscribe", topic) // comma separated list of topics
    .option("startingOffsets", "earliest")
    .option("checkpointLocation", CHECKPOINT_LOCATION)
    .option("failOnDataLoss", "false")
    .option("minPartitions", sys.env.getOrElse("MIN_PARTITIONS", "64").toInt)
    .load()


  import spark.implicits._

 


  println("JSON output to write into sink")

  val consoleOutput = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
    //.select(from_json($"json", schema) as "data")
    //.select("data.*")
    //.select(get_json_object(($"value").cast("string"), "$").alias("body"))
    .writeStream
    .foreach(new ForeachWriter[Row] {
      override def open(partitionId: Long, epochId: Long): Boolean = true

      override def process(row: Row): Unit = {
        logger.info(
          s"Record received in data frame is -> " + row.mkString )
          runProcess() // Want to run some process every microbatch

      }

      override def close(errorOrNull: Throwable): Unit = {}


    })
    .outputMode("append")
    .format("console")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()


  consoleOutput.awaitTermination()

}

我复制了你的代码,运行没有 runProcess 函数调用就没问题了。

如果您打算做两件不同的事情,我建议在从 Kafka 主题中选择相关字段后进行两个单独的查询:

val kafkaSelection = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")

1。对于每条记录或批次(理想情况下),我想打印消息

val query1 = kafkaSelection
  .writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .option("checkpointLocation", CHECKPOINT_LOCATION1)
  .start()

2。对于每个批次,我想执行一个过程。

val query2 = kafkaSelection
  .writeStream
  .foreach(new ForeachWriter[Row] {
      override def open(partitionId: Long, epochId: Long): Boolean = true

      override def process(row: Row): Unit = {
        logger.info(
          s"Record received in data frame is -> " + row.mkString )
          runProcess() // Want to run some process every microbatch

      }

      override def close(errorOrNull: Throwable): Unit = {}

    })
  .outputMode("append")
  .option("checkpointLocation", CHECKPOINT_LOCATION2)
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()


另请注意,我已分别为每个查询设置检查点位置,这将确保对 Kafka 偏移量的跟踪一致。确保每个查询有两个不同的检查点位置。您可以 运行 并行查询。

在等待它们终止之前定义这两个查询很重要:

query1.awaitTermination()
query2.awaitTermination()

使用 Spark 2.4.5 测试: