使用 Spark.SQL 编码的 Apache Spark 结构化流媒体
Structured Streaming with Apache Spark coded in Spark.SQL
使用 Databricks 的 Apache Spark 中的流转换通常使用 Scala 或 Python 进行编码。但是,有人可以告诉我是否也可以在 Delta 上的 SQL 中对流媒体进行编码吗?
例如,以下示例代码使用 PySpark 进行结构化流式传输,您能否告诉我 spark.SQL
中的等价物
simpleTransform = streaming.withColumn(" stairs", expr(" gt like '% stairs%'"))\
.where(" stairs")\
.where(" gt is not null")\
.select(" gt", "model", "arrival_time", "creation_time")\
.writeStream\
.queryName(" simple_transform")\
.format(" memory")\
.outputMode("update")\
.start()
您可以将该流式 DF 注册为临时视图,并对其执行查询。例如(为了简单起见,使用 rate
来源):
df=spark.readStream.format("rate").load()
df.createOrReplaceTempView("my_stream")
然后您可以直接对该视图执行 SQL 查询,例如 select * from my_stream
:
或者您可以创建另一个视图,应用您需要的任何转换。例如,如果我们使用这个 SQL 语句,我们可以 select 每 5 个值:
create or replace temp view my_derived as
select * from my_stream where (value % 5) == 0
然后使用 select * from my_derived
:
查询该视图
使用 Databricks 的 Apache Spark 中的流转换通常使用 Scala 或 Python 进行编码。但是,有人可以告诉我是否也可以在 Delta 上的 SQL 中对流媒体进行编码吗?
例如,以下示例代码使用 PySpark 进行结构化流式传输,您能否告诉我 spark.SQL
中的等价物simpleTransform = streaming.withColumn(" stairs", expr(" gt like '% stairs%'"))\
.where(" stairs")\
.where(" gt is not null")\
.select(" gt", "model", "arrival_time", "creation_time")\
.writeStream\
.queryName(" simple_transform")\
.format(" memory")\
.outputMode("update")\
.start()
您可以将该流式 DF 注册为临时视图,并对其执行查询。例如(为了简单起见,使用 rate
来源):
df=spark.readStream.format("rate").load()
df.createOrReplaceTempView("my_stream")
然后您可以直接对该视图执行 SQL 查询,例如 select * from my_stream
:
或者您可以创建另一个视图,应用您需要的任何转换。例如,如果我们使用这个 SQL 语句,我们可以 select 每 5 个值:
create or replace temp view my_derived as
select * from my_stream where (value % 5) == 0
然后使用 select * from my_derived
: