如何使用外部触发器停止结构化流式查询?
How to use a external trigger to stop structured streaming query?
我正在使用 spark 结构化流,我想检查是否存在 stop
文件以退出我的程序。
我可以这样做:
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
if (stop file exist) exit(0)
// do some processing here
}.start()
// add Execute Listener here to listen query
query.awaitTermination()
}
但是,只有在该查询中附加了新行时才会触发此操作 table。如果没有新行,stop
文件将没有任何影响。
有没有更好的实现这个触发器的想法?
上面是问题,感谢下面接受的答案,这是我的代码,终于可以正常工作了。
object QueryManager {
def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
override def run() = {if(stop condition) query.stop()}
}
def listenTermination(query: StreamingQuery) = {
Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
queryTerminator(query), initialDelay=1, delay=1, SECONDS
)
}
}
// and in main method
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
// do some processing here
}.start()
// add Execute Listener here to listen query
QueryManager.listenTermination(query)
query.awaitTermination()
// I am not familar with scala,
// but it seems would not exit if we do not add this
System.exit(0)
}
如有不妥之处请指出
Any better idea to implement this trigger?
流查询是结构化流应用程序的独立守护线程。它会一直运行直到使用 StreamingQuery.stop.
停止
至少有两种方法可以访问 运行 流式查询:
我们的想法是在您的结构化流应用程序中有一个 "control thread" 来监听停止请求(带有流查询的 ID)并简单地在 [= 上执行 stop
31=] 流式查询。
将 Spark Structured Streaming 应用程序视为具有多个线程的单 JVM 应用程序。您可以再使用一个来控制线程。这是基本的想法。
val query = df.writeStream.format("console").start() // get the query object
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
query.explain() // print detailed explanations of the query
query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
query.exception // the exception if the query has been terminated with error
query.recentProgress // an array of the most recent progress updates for this query
query.lastProgress // the most recent progress update of this streaming query
一个简单的 spark-shell 演示:
scala> val text = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load
22/04/09 16:40:00 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
text: org.apache.spark.sql.DataFrame = [value: string]
scala> val words = text.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val counts = words.groupBy("value").count
counts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]
scala> val writer = counts.writeStream.queryName("NetworkWordCounts").outputMode("update").format("console").option("truncate", false)
writer: org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.streaming.DataStreamWriter@57167840
scala> val query = writer.start
22/04/09 16:42:15 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-55e06380-a953-4fb4-8af1-6a3fa6b40a4f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/04/09 16:42:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5bf7053d
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+
scala> query.status
res0: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
scala> query.id
res1: java.util.UUID = e58ea08a-a251-47a9-a148-2f9660c41120
scala> query.name
res2: String = NetworkWordCounts
scala> spark.streams
res3: org.apache.spark.sql.streaming.StreamingQueryManager = org.apache.spark.sql.streaming.StreamingQueryManager@2bb09d00
scala> spark.streams.active.foreach(q => println(q.id, q.name))
(e58ea08a-a251-47a9-a148-2f9660c41120,NetworkWordCounts)
scala> spark.streams.get(query.id)
res8: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5bf7053d
scala> spark.streams.get(query.id).stop
scala> spark.streams.get(query.id).status
java.lang.NullPointerException
... 47 elided
scala> query.status
res11: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Stopped",
"isDataAvailable" : false,
"isTriggerActive" : false
}
scala>
我正在使用 spark 结构化流,我想检查是否存在 stop
文件以退出我的程序。
我可以这样做:
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
if (stop file exist) exit(0)
// do some processing here
}.start()
// add Execute Listener here to listen query
query.awaitTermination()
}
但是,只有在该查询中附加了新行时才会触发此操作 table。如果没有新行,stop
文件将没有任何影响。
有没有更好的实现这个触发器的想法?
上面是问题,感谢下面接受的答案,这是我的代码,终于可以正常工作了。
object QueryManager {
def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
override def run() = {if(stop condition) query.stop()}
}
def listenTermination(query: StreamingQuery) = {
Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
queryTerminator(query), initialDelay=1, delay=1, SECONDS
)
}
}
// and in main method
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
// do some processing here
}.start()
// add Execute Listener here to listen query
QueryManager.listenTermination(query)
query.awaitTermination()
// I am not familar with scala,
// but it seems would not exit if we do not add this
System.exit(0)
}
如有不妥之处请指出
Any better idea to implement this trigger?
流查询是结构化流应用程序的独立守护线程。它会一直运行直到使用 StreamingQuery.stop.
停止至少有两种方法可以访问 运行 流式查询:
我们的想法是在您的结构化流应用程序中有一个 "control thread" 来监听停止请求(带有流查询的 ID)并简单地在 [= 上执行 stop
31=] 流式查询。
将 Spark Structured Streaming 应用程序视为具有多个线程的单 JVM 应用程序。您可以再使用一个来控制线程。这是基本的想法。
val query = df.writeStream.format("console").start() // get the query object
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
query.explain() // print detailed explanations of the query
query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
query.exception // the exception if the query has been terminated with error
query.recentProgress // an array of the most recent progress updates for this query
query.lastProgress // the most recent progress update of this streaming query
一个简单的 spark-shell 演示:
scala> val text = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load
22/04/09 16:40:00 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
text: org.apache.spark.sql.DataFrame = [value: string]
scala> val words = text.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val counts = words.groupBy("value").count
counts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]
scala> val writer = counts.writeStream.queryName("NetworkWordCounts").outputMode("update").format("console").option("truncate", false)
writer: org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.streaming.DataStreamWriter@57167840
scala> val query = writer.start
22/04/09 16:42:15 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-55e06380-a953-4fb4-8af1-6a3fa6b40a4f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/04/09 16:42:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5bf7053d
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+
scala> query.status
res0: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
scala> query.id
res1: java.util.UUID = e58ea08a-a251-47a9-a148-2f9660c41120
scala> query.name
res2: String = NetworkWordCounts
scala> spark.streams
res3: org.apache.spark.sql.streaming.StreamingQueryManager = org.apache.spark.sql.streaming.StreamingQueryManager@2bb09d00
scala> spark.streams.active.foreach(q => println(q.id, q.name))
(e58ea08a-a251-47a9-a148-2f9660c41120,NetworkWordCounts)
scala> spark.streams.get(query.id)
res8: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5bf7053d
scala> spark.streams.get(query.id).stop
scala> spark.streams.get(query.id).status
java.lang.NullPointerException
... 47 elided
scala> query.status
res11: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Stopped",
"isDataAvailable" : false,
"isTriggerActive" : false
}
scala>