Spark Structured Streaming - 构建 window 区间
Spark Structured Streaming - constructing window interval
我加入了一个流数据帧(由下面的前四列组成)和一个静态数据帧(提供最后两列)以生成具有以下结构的新流数据帧(称为 first_agg_sdf):
+---------+--------+-----+-------------------+-----+------------+
|elementid|metricid|value| epoch|sigma|windowlength|
+---------+--------+-----+-------------------+-----+------------+
| 2| 6|41.01|2018-02-28 16:56:10| 3.3| 5 minutes|
| 2| 6|61.45|2018-02-28 16:56:24| 3.3| 5 minutes|
| 2| 6| 9.13|2018-02-28 16:56:51| 3.3| 5 minutes|
| 2| 6|34.21|2018-02-28 16:57:19| 3.3| 5 minutes|
| 2| 5|43.25|2018-02-28 16:56:10| 3.2| 3 minutes|
| 2| 5| 4.96|2018-02-28 16:56:24| 3.2| 3 minutes|
| 2| 5|22.81|2018-02-28 16:56:51| 3.2| 3 minutes|
| 2| 5| 0.04|2018-02-28 16:57:19| 3.2| 3 minutes|
这有架构:
root
|-- elementid: integer (nullable = true)
|-- metricid: integer (nullable = true)
|-- value: float (nullable = true)
|-- epoch: timestamp (nullable = true)
|-- sigma: double (nullable = true)
|-- windowlength: string (nullable = true)
然后我想生成一个滑动 window,它在 elementid、metricid 上聚合 window 持续时间,该持续时间由该行的 windowlength 列中的值给出。
我生成了以下代码:
first_agg_window = first_agg_sdf \
.withWatermark("epoch", "30 seconds") \
.groupBy(
window(timeColumn="epoch", windowDuration="windowlength", slideDuration="30 seconds"),
"elementid",
"metricid")
.agg(stddev_pop("value").alias("movingstd"), avg("value").alias("movingavg"), last("value").alias("value"))
如果我像这样向 windowDuration 属性提供字符串,则上述 window 聚合可以正常工作:
window持续时间="5 分钟"。
但是,如果我像这样使用数据框列值:
window持续时间="windowlength"
我收到以下错误:
Traceback (most recent call last):
File "/home/ec2-user/spark/spark-2.2.0-bin-
hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/home/ec2-user/spark/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-
0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.sql.functions.window.
: java.lang.IllegalArgumentException: The provided interval
(windowlength) did not correspond to a valid interval string.at org.apache.spark.sql.catalyst.expressions.TimeWindow.getIntervalInMicroSeconds(TimeWindow.scala:120) at org.apache.spark.sql.catalyst.expressions.TimeWindow$.apply(TimeWindow.scala:148)
at org.apache.spark.sql.functions$.window(functions.scala:2805)
at org.apache.spark.sql.functions$.window(functions.scala:2852)
at org.apache.spark.sql.functions.window(functions.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
如何将每行中的列值 windowlength 传递到属性 windowDuration不会产生上述错误?
不幸的是,它不是那样工作的。 window 持续时间是流式查询生命周期的固定值。它只能在流式查询开始时设置。
您很可能必须使用两个流式查询,每个 window 持续时间一个。
尝试编写两个流式查询:
1.阅读源码
2. 按 window 持续时间过滤行
3.聚合
4. Optional-join稍后返回数据
我加入了一个流数据帧(由下面的前四列组成)和一个静态数据帧(提供最后两列)以生成具有以下结构的新流数据帧(称为 first_agg_sdf):
+---------+--------+-----+-------------------+-----+------------+
|elementid|metricid|value| epoch|sigma|windowlength|
+---------+--------+-----+-------------------+-----+------------+
| 2| 6|41.01|2018-02-28 16:56:10| 3.3| 5 minutes|
| 2| 6|61.45|2018-02-28 16:56:24| 3.3| 5 minutes|
| 2| 6| 9.13|2018-02-28 16:56:51| 3.3| 5 minutes|
| 2| 6|34.21|2018-02-28 16:57:19| 3.3| 5 minutes|
| 2| 5|43.25|2018-02-28 16:56:10| 3.2| 3 minutes|
| 2| 5| 4.96|2018-02-28 16:56:24| 3.2| 3 minutes|
| 2| 5|22.81|2018-02-28 16:56:51| 3.2| 3 minutes|
| 2| 5| 0.04|2018-02-28 16:57:19| 3.2| 3 minutes|
这有架构:
root
|-- elementid: integer (nullable = true)
|-- metricid: integer (nullable = true)
|-- value: float (nullable = true)
|-- epoch: timestamp (nullable = true)
|-- sigma: double (nullable = true)
|-- windowlength: string (nullable = true)
然后我想生成一个滑动 window,它在 elementid、metricid 上聚合 window 持续时间,该持续时间由该行的 windowlength 列中的值给出。
我生成了以下代码:
first_agg_window = first_agg_sdf \
.withWatermark("epoch", "30 seconds") \
.groupBy(
window(timeColumn="epoch", windowDuration="windowlength", slideDuration="30 seconds"),
"elementid",
"metricid")
.agg(stddev_pop("value").alias("movingstd"), avg("value").alias("movingavg"), last("value").alias("value"))
如果我像这样向 windowDuration 属性提供字符串,则上述 window 聚合可以正常工作:
window持续时间="5 分钟"。
但是,如果我像这样使用数据框列值:
window持续时间="windowlength"
我收到以下错误:
Traceback (most recent call last):
File "/home/ec2-user/spark/spark-2.2.0-bin-
hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/home/ec2-user/spark/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-
0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.sql.functions.window.
: java.lang.IllegalArgumentException: The provided interval
(windowlength) did not correspond to a valid interval string.at org.apache.spark.sql.catalyst.expressions.TimeWindow.getIntervalInMicroSeconds(TimeWindow.scala:120) at org.apache.spark.sql.catalyst.expressions.TimeWindow$.apply(TimeWindow.scala:148)
at org.apache.spark.sql.functions$.window(functions.scala:2805)
at org.apache.spark.sql.functions$.window(functions.scala:2852)
at org.apache.spark.sql.functions.window(functions.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
如何将每行中的列值 windowlength 传递到属性 windowDuration不会产生上述错误?
不幸的是,它不是那样工作的。 window 持续时间是流式查询生命周期的固定值。它只能在流式查询开始时设置。
您很可能必须使用两个流式查询,每个 window 持续时间一个。
尝试编写两个流式查询: 1.阅读源码 2. 按 window 持续时间过滤行 3.聚合 4. Optional-join稍后返回数据