写入 UPDATE_BEFORE 消息以更新 kafka s
Write UPDATE_BEFORE messages to upsert kafka s
我正在 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ 读书。
上面写着:
As a sink, the upsert-kafka connector can consume a changelog stream.
It will write INSERT/UPDATE_AFTER data as normal Kafka messages value,
and write DELETE data as Kafka messages with null values (indicate
tombstone for the key).
没有提到如果写UPDATE_BEFORE消息去upsert kafka,那会发生什么?
在同一个 link (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#full-example) 中,文档提供了一个完整的示例:
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
通过上面的INSERT/SELECT操作,会产生INSERT/UPDATE_BEFORE/UPDATE_AFTER消息,会去upsert kafka sink,请问当upsert kafka遇到UPDATE_BEFORE消息会发生什么。
来自源码评论
/ / partial code
// During the Upsert mode during the serialization process, if the operation is executed is Rowkind.delete or Rowkind.Update_before
// set it to NULL (corresponding to Kafka tomb news)
Upsert-kafka sink 不需要 planner 发送 UPDATE_BEFORE 消息(在某些情况下 planner 可能仍会发送 UPDATE_BEFORE 消息),并且会像普通 Kafka 一样写入 INSERT/UPDATE_AFTER 消息具有关键部分的记录,并将 DELETE 消息写入具有空值的 Kafka 记录(指示键的墓碑)。 Flink 将通过对主键列值的分区数据来保证消息在主键上的排序。
Upsert-kafka源是一种更新日志源。变更日志源上的主键语义意味着具体化的变更日志 (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) 在主键约束上是唯一的。 Flink 假定所有消息在主键上都是有序的。
实施细节
由于 upsert-kafka 连接器只产生不包含 UPDATE_BEFORE 消息的 upsert 流。但是,一些操作需要 UPDATE_BEFORE 消息才能正确处理,例如聚合。因此,我们需要有一个物理节点来具体化更新插入流并生成包含完整更改消息的更改日志流。在物理运算符中,我们将使用状态来知道密钥是否是第一次被看到。运算符将生成 INSERT 行,或者另外为上一个图像生成 UPDATE_BEFORE 行,或者生成所有列都填充值的 DELETE 行。
我正在 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ 读书。
上面写着:
As a sink, the upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key).
没有提到如果写UPDATE_BEFORE消息去upsert kafka,那会发生什么?
在同一个 link (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#full-example) 中,文档提供了一个完整的示例:
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
通过上面的INSERT/SELECT操作,会产生INSERT/UPDATE_BEFORE/UPDATE_AFTER消息,会去upsert kafka sink,请问当upsert kafka遇到UPDATE_BEFORE消息会发生什么。
来自源码评论
/ / partial code
// During the Upsert mode during the serialization process, if the operation is executed is Rowkind.delete or Rowkind.Update_before
// set it to NULL (corresponding to Kafka tomb news)
Upsert-kafka sink 不需要 planner 发送 UPDATE_BEFORE 消息(在某些情况下 planner 可能仍会发送 UPDATE_BEFORE 消息),并且会像普通 Kafka 一样写入 INSERT/UPDATE_AFTER 消息具有关键部分的记录,并将 DELETE 消息写入具有空值的 Kafka 记录(指示键的墓碑)。 Flink 将通过对主键列值的分区数据来保证消息在主键上的排序。
Upsert-kafka源是一种更新日志源。变更日志源上的主键语义意味着具体化的变更日志 (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) 在主键约束上是唯一的。 Flink 假定所有消息在主键上都是有序的。
实施细节 由于 upsert-kafka 连接器只产生不包含 UPDATE_BEFORE 消息的 upsert 流。但是,一些操作需要 UPDATE_BEFORE 消息才能正确处理,例如聚合。因此,我们需要有一个物理节点来具体化更新插入流并生成包含完整更改消息的更改日志流。在物理运算符中,我们将使用状态来知道密钥是否是第一次被看到。运算符将生成 INSERT 行,或者另外为上一个图像生成 UPDATE_BEFORE 行,或者生成所有列都填充值的 DELETE 行。