如何将完全形成的 SQL 与 spark 结构化流一起使用
How to use fully formed SQL with spark structured streaming
Spark 结构化流的文档说 - 从 spark 2.3 开始,spark 上下文中可用于 static DataFrame
/DataSet
的所有方法都是也可用于 结构化流媒体 DataFrame
/DataSet
。但是,我还没有 运行 跨越任何 示例 。
使用完整的 SQL 比 DSL
对我来说更灵活、更有表现力、更有效率。此外,对于我的用例,那些 SQL 已经针对 static 版本进行了开发和测试。 必须需要一些返工——特别是使用join
代替correlated subqueries
。然而,保留整体浓郁的sql结构仍然有很大的价值。
我希望使用的格式就像这个假设的连接:
val tabaDf = spark.readStream(..)
val tabbDf = spark.readStream(..)
val joinSql = """select a.*,
b.productName
from taba
join tabb
on a.productId = b.productId
where ..
group by ..
having ..
order by .."""
val joinedStreamingDf = spark.sql(joinSql)
有几项不清楚如何操作:
tabaDf
和tabbDf
是否应该通过spark.readStream
定义:这是我的假设
如何声明 taba
和 tabb
。尝试使用
tabaDf.createOrReplaceTempView("taba")
tabbDf.createOrReplaceTempView("tabb")
结果
WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
我能找到的所有示例都使用了 DSL
and/or selectExpr()
- 就像下面的 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")
或使用select
:
sightingLoc
.groupBy("zip_code", window("start_time", "1 hour"))
.count()
.select(
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value"))
这些真的是唯一的选择吗——所以说 all 方法在 static
dataframe/datasets 上支持的文档并不准确?否则:a任何关于如何纠正上述问题并直接使用 sql
流式传输的指示将不胜感激。
需要使用 createOrReplaceTempView
将流注册为临时视图。 AFAIK createOrReplaceView
不是 Spark API 的一部分(也许你有一些东西可以使用这种方法提供对 class 的隐式转换)。
spark.readStream(..).createOrReplaceTempView("taba")
spark.readStream(..).createOrReplaceTempView("tabb")
现在可以使用纯 SQL 访问视图。例如,要将输出打印到控制台:
spark
.sql(joinSql)
.writeStream
.format("console")
.start()
.awaitTermination()
编辑:问题编辑后,我没有发现您的代码有任何问题。这是一个最小的工作示例。假设一个测试文件 /tmp/foo/foo.csv
"a",1
"b",2
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("s", StringType), StructField("i", IntegerType)))
spark.readStream
.schema(schema)
.csv("/tmp/foo")
.createOrReplaceTempView("df1")
spark.readStream
.schema(schema)
.csv("/tmp/foo")
.createOrReplaceTempView("df2")
spark.sql("SELECT * FROM df1 JOIN df2 USING (s)")
.writeStream
.format("console")
.start()
.awaitTermination()
产出
-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+---+
| s| i| i|
+---+---+---+
| b| 2| 2|
| a| 1| 1|
+---+---+---+
Spark 结构化流的文档说 - 从 spark 2.3 开始,spark 上下文中可用于 static DataFrame
/DataSet
的所有方法都是也可用于 结构化流媒体 DataFrame
/DataSet
。但是,我还没有 运行 跨越任何 示例 。
使用完整的 SQL 比 DSL
对我来说更灵活、更有表现力、更有效率。此外,对于我的用例,那些 SQL 已经针对 static 版本进行了开发和测试。 必须需要一些返工——特别是使用join
代替correlated subqueries
。然而,保留整体浓郁的sql结构仍然有很大的价值。
我希望使用的格式就像这个假设的连接:
val tabaDf = spark.readStream(..)
val tabbDf = spark.readStream(..)
val joinSql = """select a.*,
b.productName
from taba
join tabb
on a.productId = b.productId
where ..
group by ..
having ..
order by .."""
val joinedStreamingDf = spark.sql(joinSql)
有几项不清楚如何操作:
tabaDf
和tabbDf
是否应该通过spark.readStream
定义:这是我的假设如何声明
taba
和tabb
。尝试使用tabaDf.createOrReplaceTempView("taba") tabbDf.createOrReplaceTempView("tabb")
结果
WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
我能找到的所有示例都使用了 DSL
and/or selectExpr()
- 就像下面的 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")
或使用select
:
sightingLoc
.groupBy("zip_code", window("start_time", "1 hour"))
.count()
.select(
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value"))
这些真的是唯一的选择吗——所以说 all 方法在 static
dataframe/datasets 上支持的文档并不准确?否则:a任何关于如何纠正上述问题并直接使用 sql
流式传输的指示将不胜感激。
需要使用 createOrReplaceTempView
将流注册为临时视图。 AFAIK createOrReplaceView
不是 Spark API 的一部分(也许你有一些东西可以使用这种方法提供对 class 的隐式转换)。
spark.readStream(..).createOrReplaceTempView("taba")
spark.readStream(..).createOrReplaceTempView("tabb")
现在可以使用纯 SQL 访问视图。例如,要将输出打印到控制台:
spark
.sql(joinSql)
.writeStream
.format("console")
.start()
.awaitTermination()
编辑:问题编辑后,我没有发现您的代码有任何问题。这是一个最小的工作示例。假设一个测试文件 /tmp/foo/foo.csv
"a",1
"b",2
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("s", StringType), StructField("i", IntegerType)))
spark.readStream
.schema(schema)
.csv("/tmp/foo")
.createOrReplaceTempView("df1")
spark.readStream
.schema(schema)
.csv("/tmp/foo")
.createOrReplaceTempView("df2")
spark.sql("SELECT * FROM df1 JOIN df2 USING (s)")
.writeStream
.format("console")
.start()
.awaitTermination()
产出
-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+---+
| s| i| i|
+---+---+---+
| b| 2| 2|
| a| 1| 1|
+---+---+---+