如何在单个 Spark 应用程序中启动多个流式查询?

How to start multiple streaming queries in a single Spark application?

我在 EMR 上为 运行 构建了几个 Spark 结构化流查询,它们是很长的 运行ning 查询,并且需要一直 运行,因为它们都是 ETL类型查询,当我向 EMR 上的 YARN 集群提交作业时,我可以提交单个 spark 应用程序。这样 spark 应用程序应该有多个流式查询。

我对如何以编程方式build/start同一提交中的多个流式查询感到困惑。

例如:我有这个代码:

case class SparkJobs(prop: Properties) extends Serializable {
  def run() = {
      Type1SparkJobBuilder(prop).build().awaitTermination()
      Type1SparkJobBuilder(prop).build().awaitTermination()
  }
}

我在主 class 中用 SparkJobs(new Properties()).run()

触发了这个

当我在 spark 历史服务器中看到时,只有第一个 spark 流作业 (Type1SparkJob) 是 运行ning。

以编程方式在同一个 spark 提交中触发多个流式查询的推荐方法是什么,我也找不到合适的文档。

由于您在第一个查询上调用 awaitTermination,它将阻塞直到完成,然后再开始第二个查询。所以你想启动两个查询,然后使用 StreamingQueryManager.awaitAnyTermination.

val query1 = df.writeStream.start()
val query2 = df.writeStream.start()

spark.streams.awaitAnyTermination()

除上述之外,默认情况下,Spark 使用 FIFO 调度程序。这意味着第一个查询在执行时获取集群中的所有资源。由于您正在尝试同时 运行 多个查询,因此您应该切换到 FAIR scheduler

如果您有一些查询应该拥有比其他查询更多的资源,那么您还可以调整各个调度程序池。

val query1=ds.writeSteam.{...}.start()

val query2=ds.writeSteam.{...}.start()

val query3=ds.writeSteam.{...}.start()

query3.awaitTermination()

AwaitTermination() 将阻止您的进程直到完成,这在流式应用程序中永远不会发生,在您最后一次查询时调用它应该可以解决您的问题

这取决于您是否要退出任何查询 stops/fails 或所有查询 stop/fail

如果退出任何查询,请使用 spark.streams.awaitAnyTermination()

如果退出所有查询:

选项 1:

val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
query1.awaitTermination();
query2.awaitTermination();

选项 2:

val query1=ds.writeSteam.{...}.start()
val query2=ds.writeSteam.{...}.start()
spark.streams.active.foreach(x => x.awaitTermination())

选项 3:

while (!spark.streams.active.isEmpty) {
  println("Queries currently still active: " + spark.streams.active.map(x => x.name).mkString(","))
  spark.streams.awaitAnyTermination()
  spark.streams.resetTerminated()
}