如何从"memory" table中的流式查询访问数据以进行后续批量查询?

How to access the data from streaming query in "memory" table for subsequent batch queries?

给定一个 writeStream 调用:

val outDf = (sdf.writeStream
  .outputMode(outputMode)
  .format("memory")
  .queryName("MyInMemoryTable")
  .trigger(Trigger.ProcessingTime(interval))
  .start())

我如何运行一个sql来对抗MyInMemoryTable例如

  val df = spark.sql("""select Origin,Dest,Carrier,avg(DepDelay) avgDepDelay 
                from MyInMemoryTable group by 1,2,3""")

Spark Structured Streaming 的文档说批处理和流式查询可以混合使用,但上面的方法不起作用:

'writeStream' can be called only on streaming Dataset/DataFrame;
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only 
   on streaming Dataset/DataFrame;

那么InMemoryTable如何在后续查询中使用呢?

Hortonworks 站点上的以下 post 方法似乎很有希望 https://community.hortonworks.com/questions/181979/spark-structured-streaming-formatmemory-is-showing.html 这是示例 writeStream - 与我的原始问题具有相同的形式:

 StreamingQuery initDF = df.writeStream()
          .outputMode("append")
          .format("memory")
          .queryName("initDF")
          .trigger(Trigger.ProcessingTime(1000))
          .start();
sparkSession.sql("select * from initDF").show();

initDF.awaitTermination();

这是回复:

Okay,the way it works is :

In simple terms,think that The main Thread of your code launches another thread in which your streamingquery logic runs.

meanwhile ,your maincode is blocking due to

  initDF.awaitTermination().

sparkSession.sql("select * from initDF").show() => This code run on the mainthread ,and it reaches there only for the first time.

So update your code to :

StreamingQuery initDF = df.writeStream() .outputMode("append") .format("memory") .queryName("initDF") .trigger(Trigger.ProcessingTime(1000)) .start();

while(initDF.isActive){
Thread.sleep(10000)

sparkSession.sql("select * from initDF").show()

}

Now the main thread of your code will be going through the loop over and over again and it queries the table.

将建议应用于我的代码结果:

while(outDf.isActive) {
  Thread.sleep(30000)
  strmSql(s"select * from $table", doCnt = false, show = true, nRows = 200)
}
outDf.awaitTermination(1 * 20000)

更新 这很有效:我在每个小批量后看到更新的结果。