spark structured streaming watermark对应deviceid
Spark Structured streaming watermark corresponding to deviceid
传入的数据是如下所示的流,由 3 列组成
[
system -> deviceId,
time -> eventTime
value -> some metric
]
+-------+-------------------+-----+
|system |time |value|
+-------+-------------------+-----+
|system1|2019-08-20 07:13:10|1.5 |
|system2|2019-08-20 07:11:10|1.9 |
|system3|2019-08-20 07:13:15|1.3 |
|system1|2019-08-20 07:13:20|1.8 |
|system2|2019-08-20 07:11:20|1.6 |
|system3|2019-08-20 07:13:25|1.4 |
|system1|2019-08-20 07:13:30|1.2 |
|system2|2019-08-20 07:11:30|1.1 |
|system3|2019-08-20 07:13:35|1.5 |
+-------+-------------------+-----+
每个设备以 [10 秒] 的固定间隔生成数据,
我有 spark 结构化流应用程序,它使用
计算最大值
Window 持续时间 = 30 秒
滑动时长 = 30 秒
df.withWatermark("time", "30 seconds")
.groupBy(
window(col("time"), "30 seconds", "30 seconds"),
col("system")
)
.agg(max("value"))
问题
由于每个设备都是独立的,因此时钟也是独立的。由于各种原因,例如:[网络问题、设备使用率高等]
,设备可能会阻塞并延迟数据发送
现在,作为处理数据的单一作业,它将开始根据水印删除阻塞设备的数据,我们正在丢失数据。
有什么方法或解决方法可以将水印与设备 ID 绑定在一起。因此,该作业会根据 [deviceId EventTime] 维护水印,并且不会因为其他设备而将其丢弃。
来自 https://towardsdatascience.com/watermarking-in-spark-structured-streaming-9e164f373e9,因为我自己无法更好地表达它:
Since Spark 2.1, watermarking is introduced into Structured Streaming
API. You can enable it by simply adding the withWatermark-Operator to
a query:
withWatermark(eventTime: String, delayThreshold: String):
Dataset[T] It takes two Parameters, a) an event time column (must be
the same as the aggregate is working on) and b) a threshold to specify
for how long late data should be processed (in event time unit). The
state of an aggregate will then be maintained by Spark until max
eventTime — delayThreshold > T , where max eventTime is the latest
event time seen by the engine and T is the starting time of a window.
If late data fall within this threshold, the query gets updated
eventually (right image in the figure below). Otherwise it gets
dropped and no reprocessing is triggered (left image in figure below).
如您所见,该概念不涉及添加元数据拆分,例如deviceid.
传入的数据是如下所示的流,由 3 列组成
[
system -> deviceId,
time -> eventTime
value -> some metric
]
+-------+-------------------+-----+
|system |time |value|
+-------+-------------------+-----+
|system1|2019-08-20 07:13:10|1.5 |
|system2|2019-08-20 07:11:10|1.9 |
|system3|2019-08-20 07:13:15|1.3 |
|system1|2019-08-20 07:13:20|1.8 |
|system2|2019-08-20 07:11:20|1.6 |
|system3|2019-08-20 07:13:25|1.4 |
|system1|2019-08-20 07:13:30|1.2 |
|system2|2019-08-20 07:11:30|1.1 |
|system3|2019-08-20 07:13:35|1.5 |
+-------+-------------------+-----+
每个设备以 [10 秒] 的固定间隔生成数据,
我有 spark 结构化流应用程序,它使用
计算最大值Window 持续时间 = 30 秒
滑动时长 = 30 秒
df.withWatermark("time", "30 seconds")
.groupBy(
window(col("time"), "30 seconds", "30 seconds"),
col("system")
)
.agg(max("value"))
问题 由于每个设备都是独立的,因此时钟也是独立的。由于各种原因,例如:[网络问题、设备使用率高等]
,设备可能会阻塞并延迟数据发送现在,作为处理数据的单一作业,它将开始根据水印删除阻塞设备的数据,我们正在丢失数据。
有什么方法或解决方法可以将水印与设备 ID 绑定在一起。因此,该作业会根据 [deviceId EventTime] 维护水印,并且不会因为其他设备而将其丢弃。
来自 https://towardsdatascience.com/watermarking-in-spark-structured-streaming-9e164f373e9,因为我自己无法更好地表达它:
Since Spark 2.1, watermarking is introduced into Structured Streaming API. You can enable it by simply adding the withWatermark-Operator to a query:
withWatermark(eventTime: String, delayThreshold: String):
Dataset[T] It takes two Parameters, a) an event time column (must be the same as the aggregate is working on) and b) a threshold to specify for how long late data should be processed (in event time unit). The state of an aggregate will then be maintained by Spark until max eventTime — delayThreshold > T , where max eventTime is the latest event time seen by the engine and T is the starting time of a window. If late data fall within this threshold, the query gets updated eventually (right image in the figure below). Otherwise it gets dropped and no reprocessing is triggered (left image in figure below).
如您所见,该概念不涉及添加元数据拆分,例如deviceid.