Apache Flink:如何为动态表启用 "upsert mode"?

Apache Flink: How to enable "upsert mode" for dynamic tables?

我在 Flink 文档和官方 Flink 博客中多次提到动态 table 的 "upsert mode" .但是,我没有看到任何关于如何在动态 table.

上启用此模式的示例/文档。

示例:

所以我的问题是:

链接的资源描述了两种不同的场景。

  • blog post 讨论了更新插入 DataStream -> Table 转换。
  • documentation 描述了反向更新插入 Table -> DataStream 转换。

以下讨论基于Flink 1.4.0(2018年1月)

更新插入DataStream -> Table转换

通过对键的更新插入将 DataStream 转换为 Table 本身不受支持,但在路线图上。同时,您可以使用附加 Table 和具有用户定义聚合函数的查询来模拟此行为。

如果您有一个追加 Table Logins 与跟踪用户登录的架构 (user, loginTime, ip),您可以将其转换为一个 upsert Tableuser 使用以下查询:

SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user

LAST_VAL 聚合函数是一个 user-defined aggregation function 始终 returns 最新添加的值。

对 upsert DataStream -> Table 转换的本机支持将以基本相同的方式工作,尽管提供了更简洁的 API.

更新插入Table -> DataStream转换

不支持将 Table 转换为更新插入 DataStream。这也正确地反映在文档中:

Please note that only append and retract streams are supported when converting a dynamic table into a DataStream.

我们故意选择不支持 upsert Table -> DataStream 转换,因为 upsert DataStream 只有在知道关键属性的情况下才能处理。这些取决于查询,并不总是可以直接识别。开发人员有责任确保正确解释关键属性。否则会导致错误的程序。为避免出现问题,我们决定不提供 upsert Table -> DataStream 转换。

相反,用户可以将 Table 转换为撤回 DataStream。此外,我们支持 UpsertTableSink 将更新插入 DataStream 写入外部系统,例如数据库或键值存储。

Flink 1.8 还缺少这样的支持。期望将来添加这些功能:1) LAST_VAL 2) Upsert Stream <-> Dynamic Table.

ps。 LAST_VAL() 似乎无法在 UDTF 中实现。聚合函数不提供附加的 event/proc 时间上下文。阿里巴巴的Blink提供了LAST_VAL的替代实现,但是需要另外一个字段来提供订单信息,不能直接在event/proc时间。这使得 sql 代码难看。 (https://help.aliyun.com/knowledge_detail/62791.html)

我对 LAST_VAL(eg.get 最新 ip)的解决方案是这样的:

  1. concat(ts, ip) as ordered_ip
  2. MAX(ordered_ip) 作为 ordered_ip
  3. 提取(ordered_ip) 作为 ip

更新:自 Flink 1.9 起,LAST_VALUEbuild-in aggregate functions 的一部分,如果我们使用 Blink 规划器(这是 Flink 1.11 以来的默认设置)。

假设上面 Fabian Hueske 的回复中提到的 Logins table 存在,我们现在可以将其转换为更新插入 table ,如下所示:

SELECT 
  user, 
  LAST_VALUE(loginTime), 
  LAST_VALUE(ip) 
FROM Logins 
GROUP BY user