如何从"memory" table中的流式查询访问数据以进行后续批量查询?
How to access the data from streaming query in "memory" table for subsequent batch queries?
给定一个 writeStream
调用:
val outDf = (sdf.writeStream
.outputMode(outputMode)
.format("memory")
.queryName("MyInMemoryTable")
.trigger(Trigger.ProcessingTime(interval))
.start())
我如何运行一个sql
来对抗MyInMemoryTable
例如
val df = spark.sql("""select Origin,Dest,Carrier,avg(DepDelay) avgDepDelay
from MyInMemoryTable group by 1,2,3""")
Spark Structured Streaming
的文档说批处理和流式查询可以混合使用,但上面的方法不起作用:
'writeStream' can be called only on streaming Dataset/DataFrame;
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only
on streaming Dataset/DataFrame;
那么InMemoryTable
如何在后续查询中使用呢?
Hortonworks
站点上的以下 post 方法似乎很有希望 https://community.hortonworks.com/questions/181979/spark-structured-streaming-formatmemory-is-showing.html
这是示例 writeStream
- 与我的原始问题具有相同的形式:
StreamingQuery initDF = df.writeStream()
.outputMode("append")
.format("memory")
.queryName("initDF")
.trigger(Trigger.ProcessingTime(1000))
.start();
sparkSession.sql("select * from initDF").show();
initDF.awaitTermination();
这是回复:
Okay,the way it works is :
In simple terms,think that The main Thread of your code launches
another thread in which your streamingquery logic runs.
meanwhile ,your maincode is blocking due to
initDF.awaitTermination().
sparkSession.sql("select * from initDF").show() => This code run on the mainthread ,and it reaches there only for the first time.
So update your code to :
StreamingQuery initDF = df.writeStream() .outputMode("append") .format("memory") .queryName("initDF") .trigger(Trigger.ProcessingTime(1000)) .start();
while(initDF.isActive){
Thread.sleep(10000)
sparkSession.sql("select * from initDF").show()
}
Now the main thread of your code will be going through the loop over and over again and it queries the table.
将建议应用于我的代码结果:
while(outDf.isActive) {
Thread.sleep(30000)
strmSql(s"select * from $table", doCnt = false, show = true, nRows = 200)
}
outDf.awaitTermination(1 * 20000)
更新 这很有效:我在每个小批量后看到更新的结果。
给定一个 writeStream
调用:
val outDf = (sdf.writeStream
.outputMode(outputMode)
.format("memory")
.queryName("MyInMemoryTable")
.trigger(Trigger.ProcessingTime(interval))
.start())
我如何运行一个sql
来对抗MyInMemoryTable
例如
val df = spark.sql("""select Origin,Dest,Carrier,avg(DepDelay) avgDepDelay
from MyInMemoryTable group by 1,2,3""")
Spark Structured Streaming
的文档说批处理和流式查询可以混合使用,但上面的方法不起作用:
'writeStream' can be called only on streaming Dataset/DataFrame;
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only
on streaming Dataset/DataFrame;
那么InMemoryTable
如何在后续查询中使用呢?
Hortonworks
站点上的以下 post 方法似乎很有希望 https://community.hortonworks.com/questions/181979/spark-structured-streaming-formatmemory-is-showing.html
这是示例 writeStream
- 与我的原始问题具有相同的形式:
StreamingQuery initDF = df.writeStream()
.outputMode("append")
.format("memory")
.queryName("initDF")
.trigger(Trigger.ProcessingTime(1000))
.start();
sparkSession.sql("select * from initDF").show();
initDF.awaitTermination();
这是回复:
Okay,the way it works is :
In simple terms,think that The main Thread of your code launches another thread in which your streamingquery logic runs.
meanwhile ,your maincode is blocking due to
initDF.awaitTermination().
sparkSession.sql("select * from initDF").show() => This code run on the mainthread ,and it reaches there only for the first time.
So update your code to :
StreamingQuery initDF = df.writeStream() .outputMode("append") .format("memory") .queryName("initDF") .trigger(Trigger.ProcessingTime(1000)) .start();
while(initDF.isActive){
Thread.sleep(10000)
sparkSession.sql("select * from initDF").show()
}
Now the main thread of your code will be going through the loop over and over again and it queries the table.
将建议应用于我的代码结果:
while(outDf.isActive) {
Thread.sleep(30000)
strmSql(s"select * from $table", doCnt = false, show = true, nRows = 200)
}
outDf.awaitTermination(1 * 20000)
更新 这很有效:我在每个小批量后看到更新的结果。