使用 pyspark 结构化流计算移动平均列
Calculating a moving average column using pyspark structured streaming
我正在使用 pyspark 来处理一些传入的流数据,我想在我的数据框中添加一个新列,其中包含 50 秒的移动平均值。
我尝试使用带有 rangeBetween 的 Window 规范:
import pyspark.sql.window as W
w = (W.Window()
.partitionBy(col("sender"))
.orderBy(F.col("event_time").cast('long'))
.rangeBetween(-50, 0))
df2 = df.withColumn('rolling_average', F.avg("fr").over(w))
但这给了我一个错误,因为结构化流式传输需要基于时间的 window(可能是为了管理状态):
AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets
使用 sql.window 函数我也可以计算移动平均线,但这将通过使用翻滚的 window (和称为发件人的唯一 ID 键)分组给我结果(或跳跃)window:
df.select('sender', 'event_time', 'fr').groupBy("sender", window("event_time", "50 second")).avg().alias('avg_fr')
sender
window
avg(fr)
59834cfd-6cb2-4ece-8353-0a9b20389656
{"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"}
0.17443667352199554
8b5d90b9-65d9-4dd2-b742-31c4f0ce37d6
{"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"}
0.010564474388957024
a74204f3-e25d-4737-a302-9206cd69e90a
{"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"}
0.16375258564949036
db16426d-a9ba-449b-9777-3bdfadf0e0d9
{"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"}
0.17516431212425232
翻滚的 window 显然不是我想要的,我需要以某种方式再次将其加入到原来的 table 中。
我不确定如何根据传入的不规则事件时间戳定义滑动 window。
现在我正在考虑编写一个有状态函数,将一组先前接收到的记录存储到一个状态中,并为每个进入的新数据点更新它。但是对于这样一个常见的 activity 我希望可以用更简单的方式完成。
编辑:当前版本的 Spark (3.1.1) 只允许在 Java 或 Scala 中构建任意有状态函数,而不是 python,以保护到 JVM 的转换。
如果这真的是正确的方法,有什么想法吗?
您收到异常是因为看起来您正在构建 Window 用于批处理,而不是流数据帧。
在 Window Operations on Event-Time 部分的结构化流编程指南中给出了一个可以应用于您的用例的示例:
streamDf = ... # streaming DataFrame of schema { event_time: Timestamp, sender: String, fr: Integer }
# Group the data by window and sender and compute the average of each group
movingAverageDf = streamDf.groupBy(
window(streamDf.event_time, "50 seconds", "5 seconds"),
streamDf.sender
).avg(streamDf.fr)
请记住,如果不使用水印,您的应用程序的内部状态将无限增长。因此建议也添加水印。确保在水印中使用与 Window.
相同的事件时间
关于流式查询输出模式的注释:查看 OutputModes 中的概述以了解流式查询支持哪些模式。
根据 mike 的要求,一个最小的可重现示例。这是非流媒体案例:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
import datetime
rawData = [(1, "A", "2021-04-15T14:31:45.000", 1, 4.0),
(2, "A", "2021-04-15T14:32:46.000", 3, 5.0),
(3, "B", "2021-04-15T14:32:16.000", 8, 100.0),
(4, "B", "2021-04-15T14:33:16.000", 10, 200.0),
(5, "A", "2021-04-15T14:32:16.000", 2, -3.0),
(6, "B", "2021-04-15T14:32:47.000", 11, -500.0),
(7, "A", "2021-04-15T14:33:17.000", 0, 2.0)]
df= spark.createDataFrame(rawData).toDF("index",\
"sender",\
"event_time",\
"value1",\
"value2")
df=df.select(df['event_time'].astype('Timestamp').alias('ts'),"sender","value1","value2", )
print(df.schema)
display(df)
| ts | sender | value1 | value2 |
|------------------------------|--------|--------|--------|
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 |
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 |
要使用移动平均值向此 table 添加一个新列,我首先将时间戳截断为一个新列,分辨率为 10 秒,以防止每秒生成一个 window,这将看起来效率很低。使用 2 分钟的水印来删除延迟数据。
@udf(returnType=TimestampType())
def round_time(dt=None, round_to=10):
if dt.second%round_to==0:
s=dt.second
else:
s=(math.floor(dt.second/round_to)+1)*round_to
y=dt+datetime.timedelta(seconds=s-dt.second)
return y
df=df.withWatermark("ts", "2 minutes").select('*',round_time(df["ts"]).alias("trunct_time"))
display(df)
| ts | sender | value1 | value2 | trunct_time |
|------------------------------|--------|--------|--------|------------------------------|
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 | 2021-04-15T14:31:50.000+0000 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 | 2021-04-15T14:32:50.000+0000 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 | 2021-04-15T14:32:20.000+0000 |
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 | 2021-04-15T14:33:20.000+0000 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 | 2021-04-15T14:32:20.000+0000 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 | 2021-04-15T14:32:50.000+0000 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 | 2021-04-15T14:33:20.000+0000 |
现在我计算 50 秒滑动的移动平均值 windows 以 10 秒为增量移动。
avgDF = df.withWatermark("ts", "2 minutes").select('value1','sender','ts').groupBy("sender", window("ts", "50 second", '10 second')).avg()
avgDF = avgDF.withColumn("window_end", avgDF.window.end).withColumnRenamed('sender', 'sender2')
display(avgDF)
| sender2 | window | avg(value1) | window_end |
|---------|-------------------------------------------------------------------------------|-------------|------------------------------|
| A | {"start":"2021-04-15T14:31:10.000+0000","end":"2021-04-15T14:32:00.000+0000"} | 1 | 2021-04-15T14:32:00.000+0000 |
| A | {"start":"2021-04-15T14:31:00.000+0000","end":"2021-04-15T14:31:50.000+0000"} | 1 | 2021-04-15T14:31:50.000+0000 |
| A | {"start":"2021-04-15T14:31:20.000+0000","end":"2021-04-15T14:32:10.000+0000"} | 1 | 2021-04-15T14:32:10.000+0000 |
| A | {"start":"2021-04-15T14:31:40.000+0000","end":"2021-04-15T14:32:30.000+0000"} | 1.5 | 2021-04-15T14:32:30.000+0000 |
| A | {"start":"2021-04-15T14:31:30.000+0000","end":"2021-04-15T14:32:20.000+0000"} | 1.5 | 2021-04-15T14:32:20.000+0000 |
| A | {"start":"2021-04-15T14:32:40.000+0000","end":"2021-04-15T14:33:30.000+0000"} | 1.5 | 2021-04-15T14:33:30.000+0000 |
| B | {"start":"2021-04-15T14:31:50.000+0000","end":"2021-04-15T14:32:40.000+0000"} | 8 | 2021-04-15T14:32:40.000+0000 |
| A | {"start":"2021-04-15T14:32:30.000+0000","end":"2021-04-15T14:33:20.000+0000"} | 1.5 | 2021-04-15T14:33:20.000+0000 |
由于滑动 window 移动,我们最终在聚合 table 中增加了额外的行(上面仅部分显示):
现在我们将两个 table 重新组合在一起:
joined_stream=df.join(
avgDF,
expr("""
trunct_time = window_end AND
sender = sender2
"""),
"leftOuter"
)
display(joined_stream.select('ts','sender','value1','value2','avg(value1)'))
|ts |sender|value1|value2|avg(value1)|
|----------------------------|------|------|------|-----------|
|2021-04-15T14:31:45.000+0000|A |1 |4 |1 |
|2021-04-15T14:32:46.000+0000|A |3 |5 |2.5 |
|2021-04-15T14:32:16.000+0000|B |8 |100 |8 |
|2021-04-15T14:33:16.000+0000|B |10 |200 |10.5 |
|2021-04-15T14:32:16.000+0000|A |2 |-3 |1.5 |
|2021-04-15T14:32:47.000+0000|B |11 |-500 |9.5 |
|2021-04-15T14:33:17.000+0000|A |0 |2 |1.5 |
最终结果如我所愿(*)。
* 然而,结果可能与预期不完全一样,因为原始时间戳分辨率(以秒为单位)与聚合成 10 秒块之间可能存在不匹配
对于流媒体版本,我所做的事情与我为非流媒体解决方案发布的解决方案基本相同。
schema = StructType([ StructField("ts", TimestampType(), True), StructField("sender", StringType(), True), StructField("value1", LongType(), True), StructField("value2", FloatType(), True) ])
df = spark.readStream.schema(schema).format("csv").load("dbfs:/FileStore/shared_uploads/joris.vanagtmaal@wartsilaazure.com/raw_data*")
df=df.withWatermark("ts", "2 minutes").select('*',round_time(df["ts"]).alias("trunct_time"))
avgDF = df.withWatermark("ts", "2 minutes").select('value1','sender','ts').groupBy("sender", window("ts", "50 second", '10 second')).avg()
avgDF = avgDF.withColumn("window_end", avgDF.window.end).withColumnRenamed('sender', 'sender2').withWatermark("window_end", "2 minutes")
joined_stream=df.join(
avgDF,
expr("""
trunct_time = window_end AND
sender = sender2 AND
"""),
"leftOuter"
)
query = (
joined_stream
.writeStream
.format("memory") # memory = store in-memory table (for testing only)
.queryName("joined") # joined = name of the in-memory table
.outputMode("append") # append = allows stream on stream joins
.start()
)
这会导致以下错误:
AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details. If you understand the possible risk of correctness issue and still need to run the query, you can disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.;
文档提到:
Any of the stateful operation(s) after any of below stateful
operations can have this issue:
streaming aggregation in Append mode or
stream-stream outer join
There’s a known workaround: split your streaming query into multiple
queries per stateful operator, and ensure end-to-end exactly once per
query. Ensuring end-to-end exactly once for the last query is
optional.
但这是关于如何解决此 issue.Based 的相当神秘的描述:https://issues.apache.org/jira/browse/SPARK-28074:
It means split the queries into multiple steps with 1 stateful
operation each and persist the intermediate results to topics. This
produces mostly reproducible results. But of course it increases the
overall delay of the messages passing through.
根据设置,这可能是也可能不是正确的解决方案,但对于这个例子,我决定将检查正确性参数设置为 false,这样它就不会再抛出异常,只会在日志中写入警告。
%sql set spark.sql.streaming.statefulOperator.checkCorrectness.enabled=False
现在它会给我想要的结果:
%sql select * from joined
| ts | sender | value1 | value2 | avg(value1) |
|------------------------------|--------|--------|--------|-------------|
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 | 10.5 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 | 9.5 |
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 | 1 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 | 1.5 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 | 2.5 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 | 1.5 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 | 8 |
/* 还有一个警告,只有在这些结果后面跟着一个将水印移动到阈值(此处为 2 分钟)之外的新数据点时,这些结果才会变得可见,这在流式应用程序中不是问题,但对于这个例子我在几分钟后添加了一个新的第 8 个数据点,当然出于同样的原因,它在输出中是不可见的。
我正在使用 pyspark 来处理一些传入的流数据,我想在我的数据框中添加一个新列,其中包含 50 秒的移动平均值。
我尝试使用带有 rangeBetween 的 Window 规范:
import pyspark.sql.window as W
w = (W.Window()
.partitionBy(col("sender"))
.orderBy(F.col("event_time").cast('long'))
.rangeBetween(-50, 0))
df2 = df.withColumn('rolling_average', F.avg("fr").over(w))
但这给了我一个错误,因为结构化流式传输需要基于时间的 window(可能是为了管理状态):
AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets
使用 sql.window 函数我也可以计算移动平均线,但这将通过使用翻滚的 window (和称为发件人的唯一 ID 键)分组给我结果(或跳跃)window:
df.select('sender', 'event_time', 'fr').groupBy("sender", window("event_time", "50 second")).avg().alias('avg_fr')
sender | window | avg(fr) |
---|---|---|
59834cfd-6cb2-4ece-8353-0a9b20389656 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.17443667352199554 |
8b5d90b9-65d9-4dd2-b742-31c4f0ce37d6 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.010564474388957024 |
a74204f3-e25d-4737-a302-9206cd69e90a | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.16375258564949036 |
db16426d-a9ba-449b-9777-3bdfadf0e0d9 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.17516431212425232 |
翻滚的 window 显然不是我想要的,我需要以某种方式再次将其加入到原来的 table 中。 我不确定如何根据传入的不规则事件时间戳定义滑动 window。
现在我正在考虑编写一个有状态函数,将一组先前接收到的记录存储到一个状态中,并为每个进入的新数据点更新它。但是对于这样一个常见的 activity 我希望可以用更简单的方式完成。
编辑:当前版本的 Spark (3.1.1) 只允许在 Java 或 Scala 中构建任意有状态函数,而不是 python,以保护到 JVM 的转换。
如果这真的是正确的方法,有什么想法吗?
您收到异常是因为看起来您正在构建 Window 用于批处理,而不是流数据帧。
在 Window Operations on Event-Time 部分的结构化流编程指南中给出了一个可以应用于您的用例的示例:
streamDf = ... # streaming DataFrame of schema { event_time: Timestamp, sender: String, fr: Integer }
# Group the data by window and sender and compute the average of each group
movingAverageDf = streamDf.groupBy(
window(streamDf.event_time, "50 seconds", "5 seconds"),
streamDf.sender
).avg(streamDf.fr)
请记住,如果不使用水印,您的应用程序的内部状态将无限增长。因此建议也添加水印。确保在水印中使用与 Window.
相同的事件时间关于流式查询输出模式的注释:查看 OutputModes 中的概述以了解流式查询支持哪些模式。
根据 mike 的要求,一个最小的可重现示例。这是非流媒体案例:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
import datetime
rawData = [(1, "A", "2021-04-15T14:31:45.000", 1, 4.0),
(2, "A", "2021-04-15T14:32:46.000", 3, 5.0),
(3, "B", "2021-04-15T14:32:16.000", 8, 100.0),
(4, "B", "2021-04-15T14:33:16.000", 10, 200.0),
(5, "A", "2021-04-15T14:32:16.000", 2, -3.0),
(6, "B", "2021-04-15T14:32:47.000", 11, -500.0),
(7, "A", "2021-04-15T14:33:17.000", 0, 2.0)]
df= spark.createDataFrame(rawData).toDF("index",\
"sender",\
"event_time",\
"value1",\
"value2")
df=df.select(df['event_time'].astype('Timestamp').alias('ts'),"sender","value1","value2", )
print(df.schema)
display(df)
| ts | sender | value1 | value2 |
|------------------------------|--------|--------|--------|
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 |
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 |
要使用移动平均值向此 table 添加一个新列,我首先将时间戳截断为一个新列,分辨率为 10 秒,以防止每秒生成一个 window,这将看起来效率很低。使用 2 分钟的水印来删除延迟数据。
@udf(returnType=TimestampType())
def round_time(dt=None, round_to=10):
if dt.second%round_to==0:
s=dt.second
else:
s=(math.floor(dt.second/round_to)+1)*round_to
y=dt+datetime.timedelta(seconds=s-dt.second)
return y
df=df.withWatermark("ts", "2 minutes").select('*',round_time(df["ts"]).alias("trunct_time"))
display(df)
| ts | sender | value1 | value2 | trunct_time |
|------------------------------|--------|--------|--------|------------------------------|
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 | 2021-04-15T14:31:50.000+0000 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 | 2021-04-15T14:32:50.000+0000 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 | 2021-04-15T14:32:20.000+0000 |
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 | 2021-04-15T14:33:20.000+0000 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 | 2021-04-15T14:32:20.000+0000 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 | 2021-04-15T14:32:50.000+0000 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 | 2021-04-15T14:33:20.000+0000 |
现在我计算 50 秒滑动的移动平均值 windows 以 10 秒为增量移动。
avgDF = df.withWatermark("ts", "2 minutes").select('value1','sender','ts').groupBy("sender", window("ts", "50 second", '10 second')).avg()
avgDF = avgDF.withColumn("window_end", avgDF.window.end).withColumnRenamed('sender', 'sender2')
display(avgDF)
| sender2 | window | avg(value1) | window_end |
|---------|-------------------------------------------------------------------------------|-------------|------------------------------|
| A | {"start":"2021-04-15T14:31:10.000+0000","end":"2021-04-15T14:32:00.000+0000"} | 1 | 2021-04-15T14:32:00.000+0000 |
| A | {"start":"2021-04-15T14:31:00.000+0000","end":"2021-04-15T14:31:50.000+0000"} | 1 | 2021-04-15T14:31:50.000+0000 |
| A | {"start":"2021-04-15T14:31:20.000+0000","end":"2021-04-15T14:32:10.000+0000"} | 1 | 2021-04-15T14:32:10.000+0000 |
| A | {"start":"2021-04-15T14:31:40.000+0000","end":"2021-04-15T14:32:30.000+0000"} | 1.5 | 2021-04-15T14:32:30.000+0000 |
| A | {"start":"2021-04-15T14:31:30.000+0000","end":"2021-04-15T14:32:20.000+0000"} | 1.5 | 2021-04-15T14:32:20.000+0000 |
| A | {"start":"2021-04-15T14:32:40.000+0000","end":"2021-04-15T14:33:30.000+0000"} | 1.5 | 2021-04-15T14:33:30.000+0000 |
| B | {"start":"2021-04-15T14:31:50.000+0000","end":"2021-04-15T14:32:40.000+0000"} | 8 | 2021-04-15T14:32:40.000+0000 |
| A | {"start":"2021-04-15T14:32:30.000+0000","end":"2021-04-15T14:33:20.000+0000"} | 1.5 | 2021-04-15T14:33:20.000+0000 |
由于滑动 window 移动,我们最终在聚合 table 中增加了额外的行(上面仅部分显示):
现在我们将两个 table 重新组合在一起:
joined_stream=df.join(
avgDF,
expr("""
trunct_time = window_end AND
sender = sender2
"""),
"leftOuter"
)
display(joined_stream.select('ts','sender','value1','value2','avg(value1)'))
|ts |sender|value1|value2|avg(value1)|
|----------------------------|------|------|------|-----------|
|2021-04-15T14:31:45.000+0000|A |1 |4 |1 |
|2021-04-15T14:32:46.000+0000|A |3 |5 |2.5 |
|2021-04-15T14:32:16.000+0000|B |8 |100 |8 |
|2021-04-15T14:33:16.000+0000|B |10 |200 |10.5 |
|2021-04-15T14:32:16.000+0000|A |2 |-3 |1.5 |
|2021-04-15T14:32:47.000+0000|B |11 |-500 |9.5 |
|2021-04-15T14:33:17.000+0000|A |0 |2 |1.5 |
最终结果如我所愿(*)。
* 然而,结果可能与预期不完全一样,因为原始时间戳分辨率(以秒为单位)与聚合成 10 秒块之间可能存在不匹配
对于流媒体版本,我所做的事情与我为非流媒体解决方案发布的解决方案基本相同。
schema = StructType([ StructField("ts", TimestampType(), True), StructField("sender", StringType(), True), StructField("value1", LongType(), True), StructField("value2", FloatType(), True) ])
df = spark.readStream.schema(schema).format("csv").load("dbfs:/FileStore/shared_uploads/joris.vanagtmaal@wartsilaazure.com/raw_data*")
df=df.withWatermark("ts", "2 minutes").select('*',round_time(df["ts"]).alias("trunct_time"))
avgDF = df.withWatermark("ts", "2 minutes").select('value1','sender','ts').groupBy("sender", window("ts", "50 second", '10 second')).avg()
avgDF = avgDF.withColumn("window_end", avgDF.window.end).withColumnRenamed('sender', 'sender2').withWatermark("window_end", "2 minutes")
joined_stream=df.join(
avgDF,
expr("""
trunct_time = window_end AND
sender = sender2 AND
"""),
"leftOuter"
)
query = (
joined_stream
.writeStream
.format("memory") # memory = store in-memory table (for testing only)
.queryName("joined") # joined = name of the in-memory table
.outputMode("append") # append = allows stream on stream joins
.start()
)
这会导致以下错误:
AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details. If you understand the possible risk of correctness issue and still need to run the query, you can disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.;
文档提到:
Any of the stateful operation(s) after any of below stateful operations can have this issue:
streaming aggregation in Append mode or stream-stream outer join
There’s a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
但这是关于如何解决此 issue.Based 的相当神秘的描述:https://issues.apache.org/jira/browse/SPARK-28074:
It means split the queries into multiple steps with 1 stateful operation each and persist the intermediate results to topics. This produces mostly reproducible results. But of course it increases the overall delay of the messages passing through.
根据设置,这可能是也可能不是正确的解决方案,但对于这个例子,我决定将检查正确性参数设置为 false,这样它就不会再抛出异常,只会在日志中写入警告。
%sql set spark.sql.streaming.statefulOperator.checkCorrectness.enabled=False
现在它会给我想要的结果:
%sql select * from joined
| ts | sender | value1 | value2 | avg(value1) |
|------------------------------|--------|--------|--------|-------------|
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 | 10.5 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 | 9.5 |
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 | 1 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 | 1.5 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 | 2.5 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 | 1.5 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 | 8 |
/* 还有一个警告,只有在这些结果后面跟着一个将水印移动到阈值(此处为 2 分钟)之外的新数据点时,这些结果才会变得可见,这在流式应用程序中不是问题,但对于这个例子我在几分钟后添加了一个新的第 8 个数据点,当然出于同样的原因,它在输出中是不可见的。