有没有一种方法可以定义由最近未被事件触及的条目组成的动态 Table?
Is there a way to define a Dynamic Table comprised of entries that have NOT been touched by an event recently?
我是 Flink 的新手,我正在尝试使用它来实时查看我的应用程序。至少我想构建的动态视图之一是显示未满足 SLA 或基本上已过期的条目,并且其条件是简单的时间戳比较。所以我基本上希望一个条目显示在我的动态 table 中,如果它 NOT 最近被一个事件触及的话。在开发环境中使用 Flink 1.6(受 AWS Kinesis 限制)时,我没有看到 Flink 正在重新评估条件,除非事件触及该条目。
我已将我的开发环境插入 Kinesis 流,该流从 Web 服务器发送实时访问日志事件。这不是我的真实用例,但它很容易开始测试。我编写了一个简单的 table 查询,它提取请求路径、上次访问时间,并计算一个布尔标志以指示它是否在最后一分钟未被访问。我正在通过连接到 PrintSinkFunction 的收回流对此进行调试,因此所有 updates/deletes 都打印到我的控制台。
tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");
Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());
我希望当我访问一个页面时,一个添加事件被发送到这个流。然后,如果我等待 1 分钟(什么都不做),我的 table 中的 CASE 语句将评估为 1,因此我应该看到一个 Delete 然后 Add 事件并设置了该标志。
我实际看到的是,在我再次加载该页面之前没有任何反应。 Delete 事件实际上设置了标志,而紧随其后的 Add 事件再次清除了它(因为它不再“过期”)。
// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event
编辑:我在搜索中遇到的最有用的提示是创建一个 。我认为这是我可以用我的动态 tables 做的事情(在某些情况下我最终会使用中间流来查看计算的日期),但希望它不必达到那个目的。
我已经使用 ProcessFunction 方法,但它需要比我最初想象的更多的修改:
- 我必须在我的 POJO 中添加一个字段,该字段在 onTimer() 方法中发生变化(可以是日期或您每次都简单修改的版本)
- 我必须将此字段注册为动态的一部分 table
- 我不得不在我的查询中使用这个字段,以便重新评估查询并更改布尔标志(即使我实际上没有使用新字段)。我只是将其添加为 SELECT 子句的一部分。
您的方法看起来很有希望,但是 Flink Table API / SQL(目前)不支持与移动 "now" 时间戳的比较。
我会分两步解决这个问题。
- 在更新模式下注册动态 table,即根据版本时间戳(
requestTime
在你的情况下)。生成的动态 table 将保留每个请求的最新行。
- 使用像您这样的简单过滤谓词进行查询,比较动态(更新插入)table 行的版本时间戳,并过滤掉时间戳与现在太接近的所有行。
不幸的是,这两个功能(更新插入转换和与移动 "now" 时间戳的比较)在 Flink 中都不可用。不过,upsert table 转换还有一些正在进行的工作。
我是 Flink 的新手,我正在尝试使用它来实时查看我的应用程序。至少我想构建的动态视图之一是显示未满足 SLA 或基本上已过期的条目,并且其条件是简单的时间戳比较。所以我基本上希望一个条目显示在我的动态 table 中,如果它 NOT 最近被一个事件触及的话。在开发环境中使用 Flink 1.6(受 AWS Kinesis 限制)时,我没有看到 Flink 正在重新评估条件,除非事件触及该条目。
我已将我的开发环境插入 Kinesis 流,该流从 Web 服务器发送实时访问日志事件。这不是我的真实用例,但它很容易开始测试。我编写了一个简单的 table 查询,它提取请求路径、上次访问时间,并计算一个布尔标志以指示它是否在最后一分钟未被访问。我正在通过连接到 PrintSinkFunction 的收回流对此进行调试,因此所有 updates/deletes 都打印到我的控制台。
tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");
Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());
我希望当我访问一个页面时,一个添加事件被发送到这个流。然后,如果我等待 1 分钟(什么都不做),我的 table 中的 CASE 语句将评估为 1,因此我应该看到一个 Delete 然后 Add 事件并设置了该标志。
我实际看到的是,在我再次加载该页面之前没有任何反应。 Delete 事件实际上设置了标志,而紧随其后的 Add 事件再次清除了它(因为它不再“过期”)。
// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event
编辑:我在搜索中遇到的最有用的提示是创建一个
我已经使用 ProcessFunction 方法,但它需要比我最初想象的更多的修改:
- 我必须在我的 POJO 中添加一个字段,该字段在 onTimer() 方法中发生变化(可以是日期或您每次都简单修改的版本)
- 我必须将此字段注册为动态的一部分 table
- 我不得不在我的查询中使用这个字段,以便重新评估查询并更改布尔标志(即使我实际上没有使用新字段)。我只是将其添加为 SELECT 子句的一部分。
您的方法看起来很有希望,但是 Flink Table API / SQL(目前)不支持与移动 "now" 时间戳的比较。
我会分两步解决这个问题。
- 在更新模式下注册动态 table,即根据版本时间戳(
requestTime
在你的情况下)。生成的动态 table 将保留每个请求的最新行。 - 使用像您这样的简单过滤谓词进行查询,比较动态(更新插入)table 行的版本时间戳,并过滤掉时间戳与现在太接近的所有行。
不幸的是,这两个功能(更新插入转换和与移动 "now" 时间戳的比较)在 Flink 中都不可用。不过,upsert table 转换还有一些正在进行的工作。