如何使用外部触发器停止结构化流式查询?

How to use a external trigger to stop structured streaming query?

我正在使用 spark 结构化流,我想检查是否存在 stop 文件以退出我的程序。

我可以这样做:

def main(args: Array[String]) = {
    val query = SparkSession...load.writeStream.foreachBatch{
      if (stop file exist) exit(0)
      // do some processing here
    }.start()
    // add Execute Listener here to listen query
    query.awaitTermination()
}

但是,只有在该查询中附加了新行时才会触发此操作 table。如果没有新行,stop 文件将没有任何影响。

有没有更好的实现这个触发器的想法?


上面是问题,感谢下面接受的答案,这是我的代码,终于可以正常工作了。

object QueryManager {
  def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
    override def run() = {if(stop condition) query.stop()}
  }
  def listenTermination(query: StreamingQuery) = {
    Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
      queryTerminator(query), initialDelay=1, delay=1, SECONDS
    )
  }
}
// and in main method
def main(args: Array[String]) = {
    val query = SparkSession...load.writeStream.foreachBatch{      
      // do some processing here
    }.start()
    // add Execute Listener here to listen query
    QueryManager.listenTermination(query)

    query.awaitTermination()


    // I am not familar with scala, 
    // but it seems would not exit if we do not add this
    System.exit(0) 
}

如有不妥之处请指出

Any better idea to implement this trigger?

流查询是结构化流应用程序的独立守护线程。它会一直运行直到使用 StreamingQuery.stop.

停止

至少有两种方法可以访问 运行 流式查询:

  1. DataStreamWriter.start()
  2. StreamingQueryManager

我们的想法是在您的结构化流应用程序中有一个 "control thread" 来监听停止请求(带有流查询的 ID)并简单地在 [= 上执行 stop 31=] 流式查询。


将 Spark Structured Streaming 应用程序视为具有多个线程的单 JVM 应用程序。您可以再使用一个来控制线程。这是基本的想法。



val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query

参考:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries

一个简单的 spark-shell 演示:

scala> val text = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load
22/04/09 16:40:00 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
text: org.apache.spark.sql.DataFrame = [value: string]

scala> val words = text.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val counts = words.groupBy("value").count
counts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

scala> val writer = counts.writeStream.queryName("NetworkWordCounts").outputMode("update").format("console").option("truncate", false)
writer: org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.streaming.DataStreamWriter@57167840

scala> val query = writer.start
22/04/09 16:42:15 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-55e06380-a953-4fb4-8af1-6a3fa6b40a4f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/04/09 16:42:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5bf7053d

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

scala> query.status
res0: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}

scala> query.id
res1: java.util.UUID = e58ea08a-a251-47a9-a148-2f9660c41120

scala> query.name
res2: String = NetworkWordCounts

scala> spark.streams
res3: org.apache.spark.sql.streaming.StreamingQueryManager = org.apache.spark.sql.streaming.StreamingQueryManager@2bb09d00

scala> spark.streams.active.foreach(q => println(q.id, q.name))
(e58ea08a-a251-47a9-a148-2f9660c41120,NetworkWordCounts)

scala> spark.streams.get(query.id)
res8: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5bf7053d

scala> spark.streams.get(query.id).stop

scala> spark.streams.get(query.id).status
java.lang.NullPointerException
  ... 47 elided

scala> query.status
res11: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Stopped",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}

scala>