有没有办法动态停止 Spark Structured Streaming?

Is there a way to dynamically stop Spark Structured Streaming?

在我的场景中,我有几个数据集不时出现,我需要在我们的平台中摄取。摄取过程涉及几个转换步骤。其中之一是 Spark。特别是到目前为止,我使用的是 spark 结构化流媒体。基础设施还涉及kafka,spark structured streaming从中读取数据。

我想知道是否有一种方法可以检测何时没有其他内容可以从主题中消费一段时间来决定停止工作。那就是我想 运行 它花费的时间来消耗那个特定的数据集,然后停止它。由于特定原因,我们决定不使用 spark 的批处理版本。

因此是否存在任何超时或可用于检测没有更多数据传入并且所有内容都已处理的东西。

谢谢

Structured Streaming Monitoring Options

您可以使用 query.lastProgress 获取时间戳并围绕它构建逻辑。不要忘记将您的检查点保存到一个持久的、持久的、可用的存储中。

总结几条建​​议:

  1. 正如指出的那样,有监听器可以跟踪进度
  2. 据我所知,结构化 流式传输 doesn't yet support graceful shutdown

因此,一种选择是定期检查查询 activity, 动态 根据可配置状态关闭(当您确定没有进一步进展时 can/should制作):

// where you configure your spark job...
spark.streams.addListener(shutdownListener(spark))

// your job code starts here by calling "start()" on the stream...

// periodically await termination, checking for your shutdown state
while(!spark.sparkContext.isStopped) {
  if (shutdown) {
    println(s"Shutting down since first batch has completed...")
    spark.streams.active.foreach(_.stop())
    spark.stop()
  } else {
    // wait 10 seconds before checking again if work is complete
    spark.streams.awaitAnyTermination(10000)
  }
}

您的侦听器可以通过多种方式动态关闭。例如,如果您只等待一个批次,那么在第一次更新后就关闭:

var shutdown = false
def shutdownListener(spark: SparkSession) = new StreamingQueryListener() {
  override def onQueryStarted(_: QueryStartedEvent): Unit = println("Query started: " + queryStarted.id)
  override def onQueryTerminated(_: QueryTerminatedEvent): Unit = println("Query terminated! " + queryTerminated.id)
  override def onQueryProgress(_: QueryProgressEvent): Unit = shutdown = true
}

或者,如果您需要在更复杂的状态更改后关闭,您可以解析 queryProgress.progress 的 json 主体以确定是否在给定的 onQueryUpdate 时关闭事件触发。

你或许可以使用这个:-

def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
    while (query.isActive) {
      try{
        if(query.lastProgress.numInputRows < 10){
          query.awaitTermination(1000)
        }
      }
      catch
      {
        case e:NullPointerException => println("First Batch")
      }
      Thread.sleep(500)
    }
  }

您可以创建一个 numInputRows 变量。