Apache Flink 是否使用以前的状态来重新计算聚合?
Does Apache Flink use previous state in order to recalculate aggregations?
我使用 Kafka 连接器(使用 Flink SQL)创建了一个 purchases
table:
CREATE TABLE purchases (
country STRING,
product STRING
) WITH (
'connector' = 'kafka',
'topic' = 'purchases',
'properties.bootstrap.servers' = 'kafka:29092',
'value.format' = 'json',
'properties.group.id' = '1',
'scan.startup.mode' = 'earliest-offset'
);
然后我在 Apache Flink 中执行以下聚合:
SELECT `country`, `product`, `purchases`
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
FROM (select country, product, count(*) as `purchases` from purchases group by country, product))
WHERE row_num <= 3;
上面基本上有2个计算:
- 按国家和产品分组的购买(
select country, product, count(*) as
购买 from purchases group by country, product
)
- top-n 聚合每个国家/地区排名前 3 的产品。
我想知道 purchases
table 中的每一行都发生了什么:Flink 是否重新计算所有国家和产品的每个分组,并重新计算每个国家的前 3 名产品 或者 Flink 是否足够聪明,只重新计算新记录所属的组,也足够聪明,只为产品所属的组重新计算前 3 行?
我认为 Flink 在这两种情况下都足够智能,因为重新计算所有行的开销会非常高,但是我在文档中找不到关于此的任何明确信息。
Flink 使用状态来避免重新计算,而是增量计算所需的结果。
如
select country, product, count(*) as `purchases`
from purchases
group by country, product
Flink 将为每个不同的 country/product 对维护一个计数器,并为每个传入事件增加适当的计数器。
在您的查询中使用 EXPLAIN 揭示了优化的执行计划的样子:
Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[country], orderBy=[purchases DESC], select=[country, product, purchases])
+- Exchange(distribution=[hash[country]])
+- GroupAggregate(groupBy=[country, product], select=[country, product, COUNT(*) AS purchases])
+- Exchange(distribution=[hash[country, product]])
+- TableSourceScan(table=[[default_catalog, default_database, purchases]], fields=[country, product])
这告诉我们
- 输入是仅附加流(而不是更新的 CDC 流)
- 该流然后在(国家、产品)上进行散列分区
- 然后使用 GroupAggregate 运算符来实现分区计数
- GroupAggregate 运算符生成更新流,然后按国家/地区重新哈希分区
- Rank 运算符使用该更新计数流,并使用 UpdateFastStrategy 生成最终结果(此策略利用了知道传入的更新流将不包含任何删除,并且更新将单调递增)
一般来说,如果您希望减少保存的状态量,并且可以接受结果不完全准确的风险,您可以 configure Flink's Table/SQL API to expire idle state。但我不知道这将如何影响 Rank 运算符。
我使用 Kafka 连接器(使用 Flink SQL)创建了一个 purchases
table:
CREATE TABLE purchases (
country STRING,
product STRING
) WITH (
'connector' = 'kafka',
'topic' = 'purchases',
'properties.bootstrap.servers' = 'kafka:29092',
'value.format' = 'json',
'properties.group.id' = '1',
'scan.startup.mode' = 'earliest-offset'
);
然后我在 Apache Flink 中执行以下聚合:
SELECT `country`, `product`, `purchases`
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
FROM (select country, product, count(*) as `purchases` from purchases group by country, product))
WHERE row_num <= 3;
上面基本上有2个计算:
- 按国家和产品分组的购买(
select country, product, count(*) as
购买from purchases group by country, product
) - top-n 聚合每个国家/地区排名前 3 的产品。
我想知道 purchases
table 中的每一行都发生了什么:Flink 是否重新计算所有国家和产品的每个分组,并重新计算每个国家的前 3 名产品 或者 Flink 是否足够聪明,只重新计算新记录所属的组,也足够聪明,只为产品所属的组重新计算前 3 行?
我认为 Flink 在这两种情况下都足够智能,因为重新计算所有行的开销会非常高,但是我在文档中找不到关于此的任何明确信息。
Flink 使用状态来避免重新计算,而是增量计算所需的结果。
如
select country, product, count(*) as `purchases`
from purchases
group by country, product
Flink 将为每个不同的 country/product 对维护一个计数器,并为每个传入事件增加适当的计数器。
在您的查询中使用 EXPLAIN 揭示了优化的执行计划的样子:
Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[country], orderBy=[purchases DESC], select=[country, product, purchases])
+- Exchange(distribution=[hash[country]])
+- GroupAggregate(groupBy=[country, product], select=[country, product, COUNT(*) AS purchases])
+- Exchange(distribution=[hash[country, product]])
+- TableSourceScan(table=[[default_catalog, default_database, purchases]], fields=[country, product])
这告诉我们
- 输入是仅附加流(而不是更新的 CDC 流)
- 该流然后在(国家、产品)上进行散列分区
- 然后使用 GroupAggregate 运算符来实现分区计数
- GroupAggregate 运算符生成更新流,然后按国家/地区重新哈希分区
- Rank 运算符使用该更新计数流,并使用 UpdateFastStrategy 生成最终结果(此策略利用了知道传入的更新流将不包含任何删除,并且更新将单调递增)
一般来说,如果您希望减少保存的状态量,并且可以接受结果不完全准确的风险,您可以 configure Flink's Table/SQL API to expire idle state。但我不知道这将如何影响 Rank 运算符。