Apache Flink:如何为动态表启用 "upsert mode"?
Apache Flink: How to enable "upsert mode" for dynamic tables?
我在 Flink 文档和官方 Flink 博客中多次提到动态 table 的 "upsert mode" .但是,我没有看到任何关于如何在动态 table.
上启用此模式的示例/文档。
示例:
-
When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.
-
A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.
所以我的问题是:
- 如何在 Flink 的动态 table 上指定唯一键属性?
- 如何将动态 table 置于 update/upsert/"replace" 模式,而不是追加模式?
链接的资源描述了两种不同的场景。
- blog post 讨论了更新插入
DataStream -> Table
转换。
- documentation 描述了反向更新插入
Table -> DataStream
转换。
以下讨论基于Flink 1.4.0(2018年1月)
更新插入DataStream -> Table
转换
通过对键的更新插入将 DataStream
转换为 Table
本身不受支持,但在路线图上。同时,您可以使用附加 Table
和具有用户定义聚合函数的查询来模拟此行为。
如果您有一个追加 Table
Logins
与跟踪用户登录的架构 (user, loginTime, ip)
,您可以将其转换为一个 upsert Table
在 user
使用以下查询:
SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user
LAST_VAL
聚合函数是一个 user-defined aggregation function 始终 returns 最新添加的值。
对 upsert DataStream -> Table
转换的本机支持将以基本相同的方式工作,尽管提供了更简洁的 API.
更新插入Table -> DataStream
转换
不支持将 Table
转换为更新插入 DataStream
。这也正确地反映在文档中:
Please note that only append and retract streams are supported when converting a dynamic table into a DataStream.
我们故意选择不支持 upsert Table -> DataStream
转换,因为 upsert DataStream
只有在知道关键属性的情况下才能处理。这些取决于查询,并不总是可以直接识别。开发人员有责任确保正确解释关键属性。否则会导致错误的程序。为避免出现问题,我们决定不提供 upsert Table -> DataStream
转换。
相反,用户可以将 Table
转换为撤回 DataStream
。此外,我们支持 UpsertTableSink
将更新插入 DataStream
写入外部系统,例如数据库或键值存储。
Flink 1.8 还缺少这样的支持。期望将来添加这些功能:1) LAST_VAL 2) Upsert Stream <-> Dynamic Table.
ps。 LAST_VAL() 似乎无法在 UDTF 中实现。聚合函数不提供附加的 event/proc 时间上下文。阿里巴巴的Blink提供了LAST_VAL的替代实现,但是需要另外一个字段来提供订单信息,不能直接在event/proc时间。这使得 sql 代码难看。 (https://help.aliyun.com/knowledge_detail/62791.html)
我对 LAST_VAL(eg.get 最新 ip)的解决方案是这样的:
- concat(ts, ip) as ordered_ip
- MAX(ordered_ip) 作为 ordered_ip
- 提取(ordered_ip) 作为 ip
更新:自 Flink 1.9 起,LAST_VALUE
是 build-in aggregate functions 的一部分,如果我们使用 Blink 规划器(这是 Flink 1.11 以来的默认设置)。
假设上面 Fabian Hueske 的回复中提到的 Logins
table 存在,我们现在可以将其转换为更新插入 table ,如下所示:
SELECT
user,
LAST_VALUE(loginTime),
LAST_VALUE(ip)
FROM Logins
GROUP BY user
我在 Flink 文档和官方 Flink 博客中多次提到动态 table 的 "upsert mode" .但是,我没有看到任何关于如何在动态 table.
上启用此模式的示例/文档。示例:
-
When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.
-
A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.
所以我的问题是:
- 如何在 Flink 的动态 table 上指定唯一键属性?
- 如何将动态 table 置于 update/upsert/"replace" 模式,而不是追加模式?
链接的资源描述了两种不同的场景。
- blog post 讨论了更新插入
DataStream -> Table
转换。 - documentation 描述了反向更新插入
Table -> DataStream
转换。
以下讨论基于Flink 1.4.0(2018年1月)
更新插入DataStream -> Table
转换
通过对键的更新插入将 DataStream
转换为 Table
本身不受支持,但在路线图上。同时,您可以使用附加 Table
和具有用户定义聚合函数的查询来模拟此行为。
如果您有一个追加 Table
Logins
与跟踪用户登录的架构 (user, loginTime, ip)
,您可以将其转换为一个 upsert Table
在 user
使用以下查询:
SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user
LAST_VAL
聚合函数是一个 user-defined aggregation function 始终 returns 最新添加的值。
对 upsert DataStream -> Table
转换的本机支持将以基本相同的方式工作,尽管提供了更简洁的 API.
更新插入Table -> DataStream
转换
不支持将 Table
转换为更新插入 DataStream
。这也正确地反映在文档中:
Please note that only append and retract streams are supported when converting a dynamic table into a DataStream.
我们故意选择不支持 upsert Table -> DataStream
转换,因为 upsert DataStream
只有在知道关键属性的情况下才能处理。这些取决于查询,并不总是可以直接识别。开发人员有责任确保正确解释关键属性。否则会导致错误的程序。为避免出现问题,我们决定不提供 upsert Table -> DataStream
转换。
相反,用户可以将 Table
转换为撤回 DataStream
。此外,我们支持 UpsertTableSink
将更新插入 DataStream
写入外部系统,例如数据库或键值存储。
Flink 1.8 还缺少这样的支持。期望将来添加这些功能:1) LAST_VAL 2) Upsert Stream <-> Dynamic Table.
ps。 LAST_VAL() 似乎无法在 UDTF 中实现。聚合函数不提供附加的 event/proc 时间上下文。阿里巴巴的Blink提供了LAST_VAL的替代实现,但是需要另外一个字段来提供订单信息,不能直接在event/proc时间。这使得 sql 代码难看。 (https://help.aliyun.com/knowledge_detail/62791.html)
我对 LAST_VAL(eg.get 最新 ip)的解决方案是这样的:
- concat(ts, ip) as ordered_ip
- MAX(ordered_ip) 作为 ordered_ip
- 提取(ordered_ip) 作为 ip
更新:自 Flink 1.9 起,LAST_VALUE
是 build-in aggregate functions 的一部分,如果我们使用 Blink 规划器(这是 Flink 1.11 以来的默认设置)。
假设上面 Fabian Hueske 的回复中提到的 Logins
table 存在,我们现在可以将其转换为更新插入 table ,如下所示:
SELECT
user,
LAST_VALUE(loginTime),
LAST_VALUE(ip)
FROM Logins
GROUP BY user