基于事件时间的 Flink Idle State Retention
Flink Idle State Retention based on event time
这可能是一个简单的问题,但我在文档中找不到明确说明:使用 StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
时 Flink 的空闲状态保留是基于事件时间计算的,还是始终基于关于处理时间?
我遇到的情况与 Idle State Retention Time 文档中描述的非常相似,所以我将以此为例。我正在计算一段时间内每个会话的点击次数
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
并期望会话最终变为非活动状态,这是我希望空闲状态保留策略生效的地方。与示例的唯一区别是在作业启动时,我编写了一个自定义源函数在切换到 Kafka 以获取新传入事件之前,最初从 S3 读取历史事件(处理最近 N 天的数据)。假设我将空闲状态保留设置为 72 小时,并处理来自 S3 的过去一个月的数据——最终我希望状态的大小稳定在大约 3 天的数据量下,同时处理来自 S3 的数据作为非活动状态会话被删除。实际上,状态在处理上个月的数据的整个过程中持续增长。
不幸的是 windows 我实际工作的时间要长得多(目前空闲状态保留设置为 20 天)所以我还没有机会看看状态是否会收缩它达到了处理时间的那个点。也有可能是我在我的源函数中做错了什么,导致空闲状态保留清理无法正常工作,因此我们将不胜感激。
Flink 的空闲状态保留时间SQL 基于处理时间。
这可能是一个简单的问题,但我在文档中找不到明确说明:使用 StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
时 Flink 的空闲状态保留是基于事件时间计算的,还是始终基于关于处理时间?
我遇到的情况与 Idle State Retention Time 文档中描述的非常相似,所以我将以此为例。我正在计算一段时间内每个会话的点击次数
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
并期望会话最终变为非活动状态,这是我希望空闲状态保留策略生效的地方。与示例的唯一区别是在作业启动时,我编写了一个自定义源函数在切换到 Kafka 以获取新传入事件之前,最初从 S3 读取历史事件(处理最近 N 天的数据)。假设我将空闲状态保留设置为 72 小时,并处理来自 S3 的过去一个月的数据——最终我希望状态的大小稳定在大约 3 天的数据量下,同时处理来自 S3 的数据作为非活动状态会话被删除。实际上,状态在处理上个月的数据的整个过程中持续增长。
不幸的是 windows 我实际工作的时间要长得多(目前空闲状态保留设置为 20 天)所以我还没有机会看看状态是否会收缩它达到了处理时间的那个点。也有可能是我在我的源函数中做错了什么,导致空闲状态保留清理无法正常工作,因此我们将不胜感激。
Flink 的空闲状态保留时间SQL 基于处理时间。