带有火花流的多个 writeStream
multiple writeStream with spark streaming
我正在使用 spark streaming,但在尝试实现多个 writestream 时遇到了一些问题。
下面是我的代码
DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
writeStreamer 定义如下:
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {
val query = input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
query.awaitTermination()
}
我面临的问题是只有第一个 table 是用 spark writeStream 编写的,所有其他 tables 没有任何反应。
你对此有什么想法吗?
By default the number of concurrent jobs is 1 which means at a time
only 1 job will be active
您是否尝试在 spark conf 中增加可能的并发作业数量?
sparkConf.set("spark.streaming.concurrentJobs","3")
不是官方来源:http://why-not-learn-something.blogspot.com/2016/06/spark-streaming-performance-tuning-on.html
query.awaitTermination()
应该在 最后一个流创建之后完成。
writeStreamer
函数可以修改为 return a StreamingQuery
而不是 awaitTermination 此时(因为它是 blocking):
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
}
那么你将拥有:
val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)
query3.awaitTermination()
如果你想并行执行写入 运行 的操作,你可以使用
sparkSession.streams.awaitAnyTermination()
并从 writeStreamer 方法中删除 query.awaitTermination()
我正在使用 spark streaming,但在尝试实现多个 writestream 时遇到了一些问题。 下面是我的代码
DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
writeStreamer 定义如下:
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {
val query = input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
query.awaitTermination()
}
我面临的问题是只有第一个 table 是用 spark writeStream 编写的,所有其他 tables 没有任何反应。 你对此有什么想法吗?
By default the number of concurrent jobs is 1 which means at a time only 1 job will be active
您是否尝试在 spark conf 中增加可能的并发作业数量?
sparkConf.set("spark.streaming.concurrentJobs","3")
不是官方来源:http://why-not-learn-something.blogspot.com/2016/06/spark-streaming-performance-tuning-on.html
query.awaitTermination()
应该在 最后一个流创建之后完成。
writeStreamer
函数可以修改为 return a StreamingQuery
而不是 awaitTermination 此时(因为它是 blocking):
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
}
那么你将拥有:
val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)
query3.awaitTermination()
如果你想并行执行写入 运行 的操作,你可以使用
sparkSession.streams.awaitAnyTermination()
并从 writeStreamer 方法中删除 query.awaitTermination()