如何在单个 Spark 应用程序中启动多个流式查询?
How to start multiple streaming queries in a single Spark application?
我在 EMR 上为 运行 构建了几个 Spark 结构化流查询,它们是很长的 运行ning 查询,并且需要一直 运行,因为它们都是 ETL类型查询,当我向 EMR 上的 YARN 集群提交作业时,我可以提交单个 spark 应用程序。这样 spark 应用程序应该有多个流式查询。
我对如何以编程方式build/start同一提交中的多个流式查询感到困惑。
例如:我有这个代码:
case class SparkJobs(prop: Properties) extends Serializable {
def run() = {
Type1SparkJobBuilder(prop).build().awaitTermination()
Type1SparkJobBuilder(prop).build().awaitTermination()
}
}
我在主 class 中用 SparkJobs(new Properties()).run()
触发了这个
当我在 spark 历史服务器中看到时,只有第一个 spark 流作业 (Type1SparkJob) 是 运行ning。
以编程方式在同一个 spark 提交中触发多个流式查询的推荐方法是什么,我也找不到合适的文档。
由于您在第一个查询上调用 awaitTermination
,它将阻塞直到完成,然后再开始第二个查询。所以你想启动两个查询,然后使用 StreamingQueryManager.awaitAnyTermination
.
val query1 = df.writeStream.start()
val query2 = df.writeStream.start()
spark.streams.awaitAnyTermination()
除上述之外,默认情况下,Spark 使用 FIFO 调度程序。这意味着第一个查询在执行时获取集群中的所有资源。由于您正在尝试同时 运行 多个查询,因此您应该切换到 FAIR scheduler
如果您有一些查询应该拥有比其他查询更多的资源,那么您还可以调整各个调度程序池。
val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
val query3=ds.writeSteam.{...}.start()
query3.awaitTermination()
AwaitTermination() 将阻止您的进程直到完成,这在流式应用程序中永远不会发生,在您最后一次查询时调用它应该可以解决您的问题
这取决于您是否要退出任何查询 stops/fails 或所有查询 stop/fail
如果退出任何查询,请使用 spark.streams.awaitAnyTermination()
。
如果退出所有查询:
选项 1:
val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
query1.awaitTermination();
query2.awaitTermination();
选项 2:
val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
spark.streams.active.foreach(x => x.awaitTermination())
选项 3:
while (!spark.streams.active.isEmpty) {
println("Queries currently still active: " + spark.streams.active.map(x => x.name).mkString(","))
spark.streams.awaitAnyTermination()
spark.streams.resetTerminated()
}
我在 EMR 上为 运行 构建了几个 Spark 结构化流查询,它们是很长的 运行ning 查询,并且需要一直 运行,因为它们都是 ETL类型查询,当我向 EMR 上的 YARN 集群提交作业时,我可以提交单个 spark 应用程序。这样 spark 应用程序应该有多个流式查询。
我对如何以编程方式build/start同一提交中的多个流式查询感到困惑。
例如:我有这个代码:
case class SparkJobs(prop: Properties) extends Serializable {
def run() = {
Type1SparkJobBuilder(prop).build().awaitTermination()
Type1SparkJobBuilder(prop).build().awaitTermination()
}
}
我在主 class 中用 SparkJobs(new Properties()).run()
当我在 spark 历史服务器中看到时,只有第一个 spark 流作业 (Type1SparkJob) 是 运行ning。
以编程方式在同一个 spark 提交中触发多个流式查询的推荐方法是什么,我也找不到合适的文档。
由于您在第一个查询上调用 awaitTermination
,它将阻塞直到完成,然后再开始第二个查询。所以你想启动两个查询,然后使用 StreamingQueryManager.awaitAnyTermination
.
val query1 = df.writeStream.start()
val query2 = df.writeStream.start()
spark.streams.awaitAnyTermination()
除上述之外,默认情况下,Spark 使用 FIFO 调度程序。这意味着第一个查询在执行时获取集群中的所有资源。由于您正在尝试同时 运行 多个查询,因此您应该切换到 FAIR scheduler
如果您有一些查询应该拥有比其他查询更多的资源,那么您还可以调整各个调度程序池。
val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
val query3=ds.writeSteam.{...}.start()
query3.awaitTermination()
AwaitTermination() 将阻止您的进程直到完成,这在流式应用程序中永远不会发生,在您最后一次查询时调用它应该可以解决您的问题
这取决于您是否要退出任何查询 stops/fails 或所有查询 stop/fail
如果退出任何查询,请使用 spark.streams.awaitAnyTermination()
。
如果退出所有查询:
选项 1:
val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
query1.awaitTermination();
query2.awaitTermination();
选项 2:
val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
spark.streams.active.foreach(x => x.awaitTermination())
选项 3:
while (!spark.streams.active.isEmpty) {
println("Queries currently still active: " + spark.streams.active.map(x => x.name).mkString(","))
spark.streams.awaitAnyTermination()
spark.streams.resetTerminated()
}