如何访问流式查询的指标?
How to access metrics of streaming query?
我使用 Spark 2.4。
我正在将 Spark Streaming 应用程序迁移到 Structured Streaming。
我正在研究每个批次的生成指标,我想控制每个微批次的统计数据。我对每个 microBatch 的 processingDelay
、schedulingDelay
和 totalDelay
指标以及在 Structured Streaming 中的何处找到它们感兴趣。
我尝试了以下方法,但它没有生成任何统计数据。
val recentBatchInfos = new StatsReportListener(60).batchInfos
val numberOfRecords = recentBatchInfos.map(_.numRecords).sum
谁能告诉我如何使用 have control over stats 并生成相应的指标?
Spark Structured Streaming和Spark Streaming的计算模型不同。 Structured Streaming 使用 Dataset
数据抽象,而 Spark Streaming 直接使用 RDD API。 Structured Streaming 中的可用指标会有所不同。
你真的应该使用StreamingQueryListener,它是监控界面:
Interface for listening to events related to StreamingQueries.
onQueryProgress(event: QueryProgressEvent): Unit
让您可以访问当前的 StreamingQueryProgress 以及所有当前的流媒体指标。
参考Spark Structured Streaming官方文档Reporting Metrics programmatically using Asynchronous APIs
我使用 Spark 2.4。
我正在将 Spark Streaming 应用程序迁移到 Structured Streaming。
我正在研究每个批次的生成指标,我想控制每个微批次的统计数据。我对每个 microBatch 的 processingDelay
、schedulingDelay
和 totalDelay
指标以及在 Structured Streaming 中的何处找到它们感兴趣。
我尝试了以下方法,但它没有生成任何统计数据。
val recentBatchInfos = new StatsReportListener(60).batchInfos
val numberOfRecords = recentBatchInfos.map(_.numRecords).sum
谁能告诉我如何使用 have control over stats 并生成相应的指标?
Spark Structured Streaming和Spark Streaming的计算模型不同。 Structured Streaming 使用 Dataset
数据抽象,而 Spark Streaming 直接使用 RDD API。 Structured Streaming 中的可用指标会有所不同。
你真的应该使用StreamingQueryListener,它是监控界面:
Interface for listening to events related to StreamingQueries.
onQueryProgress(event: QueryProgressEvent): Unit
让您可以访问当前的 StreamingQueryProgress 以及所有当前的流媒体指标。
参考Spark Structured Streaming官方文档Reporting Metrics programmatically using Asynchronous APIs