spark streaming 了解 mapGroupsWithState 中的超时设置
spark streaming understanding timeout setup in mapGroupsWithState
我非常努力地理解使用 mapGroupsWithState
进行 spark 结构化流式传输时的超时设置。
下面的link有非常详细的说明,但我不确定我是否理解正确,尤其是GroupState.setTimeoutTimeStamp()
选项。意思是将状态到期时间设置为与事件时间相关。
https://spark.apache.org/docs/3.0.0-preview/api/scala/org/apache/spark/sql/streaming/GroupState.html
我把它们复制到这里:
With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark().
With this setting, data that is older than the watermark are filtered out.
The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp.
You can control the timeout delay by two parameters - watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering).
Guarantees provided by this timeout are as follows:
Timeout will never be occur before watermark has exceeded the set timeout.
Similar to processing time timeouts, there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced.
问题 1:
这句and the timeout would occur when the watermark advances beyond the set timestamp
中的这个timestamp
是什么?它是绝对时间还是相对于该州当前事件时间的持续时间?我知道我可以通过 ```
删除状态来使它过期
例如假设我有一些如下所示的数据状态,when
它会通过在 what settings
中设置 what value
来过期吗?
+-------+-----------+-------------------+
|expired|something | timestamp|
+-------+-----------+-------------------+
| false| someKey |2020-08-02 22:02:00|
+-------+-----------+-------------------+
问题 2:
看了Data that is older than the watermark are filtered out
这句话,理解为从kafka读取迟到数据后被忽略了,对吗?
问题原因
如果不了解这些,我就无法真正将它们应用到用例中。意思是什么时候用GroupState.setTimeoutDuration()
,什么时候用GroupState.setTimeoutTimestamp()
非常感谢。
ps。我也试着阅读下面
- https://www.waitingforcode.com/apache-spark-structured-streaming/stateful-transformations-mapgroupswithstate/read
(confused me, did not understand)
- https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
(did not say a lot of it for my interest)
What is this timestamp
in the sentence and the timeout would occur when the watermark advances beyond the set timestamp
?
这是您GroupState.setTimeoutTimestamp()
设置的时间戳。
is it an absolute time or is it a relative time duration to the current event time in the state?
这是基于当前批次window的相对时间(不是持续时间)。
say I have some data state (column timestamp=2020-08-02 22:02:00
), when will it expire by setting up what value in what settings?
让我们假设您的接收器查询定义了一个 5 分钟的处理触发器(由 trigger()
设置)。此外,让我们假设您在应用 groupByKey
和 mapGroupsWithState
之前使用了水印。我知道您想使用基于 事件时间 的超时(而不是 处理时间 ,因此您的查询将类似于:
ds.withWatermark("timestamp", "10 minutes")
.groupByKey(...) // declare your key
.mapGroupsWithState(
GroupStateTimeout.EventTimeTimeout)(
...) // your custom update logic
现在,这取决于您如何使用“自定义更新逻辑”设置 TimeoutTimestamp。在您的自定义更新逻辑中的某处,您需要调用
state.setTimeoutTimestamp()
此方法有四种不同的签名,值得仔细阅读他们的文档。由于我们在 (withWatermark
) 中设置了水印,因此我们实际上可以利用该时间。作为一般规则:重要的是将超时时间戳(由 state.setTimeoutTimestamp()
设置)设置为大于当前水印 的值。为了继续我们的示例,我们 添加 一小时,如下所示:
state.setTimeoutTimestamp(state.getCurrentWatermarkMs, "1 hour")
总而言之,您的消息可以在 22:00:00
和 22:15:00
之间到达您的流,如果该消息是密钥的最后一条消息,它将在您的 GroupState 中超时 23:15:00
。
question 2: Reading the sentence Data that is older than the watermark are filtered out
, I understand the late arrival data is ignored after it is read from kafka, this is correct?
是的,这是正确的。对于批处理间隔 22:00:00 - 22:05:00 所有具有事件时间(由列 timestamp
定义)的消息晚于声明的 10 分钟水印到达(意思是晚于 22:15:00 ) 将在您的查询中被忽略,并且不会在您的“自定义更新逻辑”中进行处理。
我非常努力地理解使用 mapGroupsWithState
进行 spark 结构化流式传输时的超时设置。
下面的link有非常详细的说明,但我不确定我是否理解正确,尤其是GroupState.setTimeoutTimeStamp()
选项。意思是将状态到期时间设置为与事件时间相关。
https://spark.apache.org/docs/3.0.0-preview/api/scala/org/apache/spark/sql/streaming/GroupState.html
我把它们复制到这里:
With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark().
With this setting, data that is older than the watermark are filtered out.
The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp.
You can control the timeout delay by two parameters - watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering).
Guarantees provided by this timeout are as follows:
Timeout will never be occur before watermark has exceeded the set timeout.
Similar to processing time timeouts, there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced.
问题 1:
这句and the timeout would occur when the watermark advances beyond the set timestamp
中的这个timestamp
是什么?它是绝对时间还是相对于该州当前事件时间的持续时间?我知道我可以通过 ```
例如假设我有一些如下所示的数据状态,when
它会通过在 what settings
中设置 what value
来过期吗?
+-------+-----------+-------------------+
|expired|something | timestamp|
+-------+-----------+-------------------+
| false| someKey |2020-08-02 22:02:00|
+-------+-----------+-------------------+
问题 2:
看了Data that is older than the watermark are filtered out
这句话,理解为从kafka读取迟到数据后被忽略了,对吗?
问题原因
如果不了解这些,我就无法真正将它们应用到用例中。意思是什么时候用GroupState.setTimeoutDuration()
,什么时候用GroupState.setTimeoutTimestamp()
非常感谢。
ps。我也试着阅读下面
- https://www.waitingforcode.com/apache-spark-structured-streaming/stateful-transformations-mapgroupswithstate/read
(confused me, did not understand)
- https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
(did not say a lot of it for my interest)
What is this
timestamp
in the sentenceand the timeout would occur when the watermark advances beyond the set timestamp
?
这是您GroupState.setTimeoutTimestamp()
设置的时间戳。
is it an absolute time or is it a relative time duration to the current event time in the state?
这是基于当前批次window的相对时间(不是持续时间)。
say I have some data state (column
timestamp=2020-08-02 22:02:00
), when will it expire by setting up what value in what settings?
让我们假设您的接收器查询定义了一个 5 分钟的处理触发器(由 trigger()
设置)。此外,让我们假设您在应用 groupByKey
和 mapGroupsWithState
之前使用了水印。我知道您想使用基于 事件时间 的超时(而不是 处理时间 ,因此您的查询将类似于:
ds.withWatermark("timestamp", "10 minutes")
.groupByKey(...) // declare your key
.mapGroupsWithState(
GroupStateTimeout.EventTimeTimeout)(
...) // your custom update logic
现在,这取决于您如何使用“自定义更新逻辑”设置 TimeoutTimestamp。在您的自定义更新逻辑中的某处,您需要调用
state.setTimeoutTimestamp()
此方法有四种不同的签名,值得仔细阅读他们的文档。由于我们在 (withWatermark
) 中设置了水印,因此我们实际上可以利用该时间。作为一般规则:重要的是将超时时间戳(由 state.setTimeoutTimestamp()
设置)设置为大于当前水印 的值。为了继续我们的示例,我们 添加 一小时,如下所示:
state.setTimeoutTimestamp(state.getCurrentWatermarkMs, "1 hour")
总而言之,您的消息可以在 22:00:00
和 22:15:00
之间到达您的流,如果该消息是密钥的最后一条消息,它将在您的 GroupState 中超时 23:15:00
。
question 2: Reading the sentence
Data that is older than the watermark are filtered out
, I understand the late arrival data is ignored after it is read from kafka, this is correct?
是的,这是正确的。对于批处理间隔 22:00:00 - 22:05:00 所有具有事件时间(由列 timestamp
定义)的消息晚于声明的 10 分钟水印到达(意思是晚于 22:15:00 ) 将在您的查询中被忽略,并且不会在您的“自定义更新逻辑”中进行处理。