如何将完全形成的 SQL 与 spark 结构化流一起使用

How to use fully formed SQL with spark structured streaming

Spark 结构化流的文档说 - 从 spark 2.3 开始,spark 上下文中可用于 static DataFrame/DataSet 的所有方法都是也可用于 结构化流媒体 DataFrame/DataSet。但是,我还没有 运行 跨越任何 示例

使用完整的 SQL 比 DSL 对我来说更灵活、更有表现力、更有效率。此外,对于我的用例,那些 SQL 已经针对 static 版本进行了开发和测试。 必须需要一些返工——特别是使用join代替correlated subqueries。然而,保留整体浓郁的sql结构仍然有很大的价值。

我希望使用的格式就像这个假设的连接:

 val tabaDf = spark.readStream(..)
 val tabbDf = spark.readStream(..)

 val joinSql = """select a.*, 
                  b.productName 
                  from taba
                  join tabb 
                  on a.productId = b.productId
                  where ..
                  group by ..
                  having ..
                  order by .."""
 val joinedStreamingDf = spark.sql(joinSql)

有几项不清楚如何操作:

我能找到的所有示例都使用了 DSL and/or selectExpr() - 就像下面的 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

 df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")

或使用select

sightingLoc
  .groupBy("zip_code", window("start_time", "1 hour"))
  .count()
  .select( 
    to_json(struct("zip_code", "window")).alias("key"),
    col("count").cast("string").alias("value")) 

这些真的是唯一的选择吗——所以说 all 方法在 static dataframe/datasets 上支持的文档并不准确?否则:a任何关于如何纠正上述问题并直接使用 sql 流式传输的指示将不胜感激。

需要使用 createOrReplaceTempView 将流注册为临时视图。 AFAIK createOrReplaceView 不是 Spark API 的一部分(也许你有一些东西可以使用这种方法提供对 class 的隐式转换)。

spark.readStream(..).createOrReplaceTempView("taba")
spark.readStream(..).createOrReplaceTempView("tabb")

现在可以使用纯 SQL 访问视图。例如,要将输出打印到控制台:

spark
  .sql(joinSql)
  .writeStream
  .format("console")
  .start()
  .awaitTermination()

编辑:问题编辑后,我没有发现您的代码有任何问题。这是一个最小的工作示例。假设一个测试文件 /tmp/foo/foo.csv

"a",1
"b",2
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("s", StringType), StructField("i", IntegerType)))
spark.readStream
  .schema(schema)
  .csv("/tmp/foo")
  .createOrReplaceTempView("df1")
spark.readStream
  .schema(schema)
  .csv("/tmp/foo")
  .createOrReplaceTempView("df2")

spark.sql("SELECT * FROM df1 JOIN df2 USING (s)")
  .writeStream
  .format("console")
  .start()
  .awaitTermination()

产出

-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+---+
|  s|  i|  i|
+---+---+---+
|  b|  2|  2|
|  a|  1|  1|
+---+---+---+