Spark 指标为空

Spark metrics are empty

查询成功完成,数据出现在接收器中。 但是听众得到的指标是空的:

  spark.listenerManager.register(new QueryExecutionListener {
      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
        println(qe.observedMetrics)
        println(qe.executedPlan.metrics)
      }

      override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
      //pass
      }
  })

结果:

Map()
Map()

为什么?如何告诉 spark 收集这些指标?

编辑:

K8s 上的 Spark 3.1.2 (只是使用了 spark 文档中的示例)

 val spark = SparkSession
   .builder()
   .appName("test-app")
   .getOrCreate()

// at this point comes the 'spark.listenerManager.register' part

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "input")
  .option("maxOffsetsPerTrigger", 1000)
  .load()

val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("topic", "output")
  .start()

ds.awaitTermination()

QueryExecutionListener 用于批处理作业,对于流作业,它应该是:

val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})

可以参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis