Apache Flink 是否使用以前的状态来重新计算聚合?

Does Apache Flink use previous state in order to recalculate aggregations?

我使用 Kafka 连接器(使用 Flink SQL)创建了一个 purchases table:

CREATE TABLE purchases (
  country STRING,
  product STRING
) WITH (
   'connector' = 'kafka',
   'topic' = 'purchases',
   'properties.bootstrap.servers' = 'kafka:29092',
   'value.format' = 'json',
   'properties.group.id' = '1',
   'scan.startup.mode' = 'earliest-offset'
);

然后我在 Apache Flink 中执行以下聚合:

SELECT `country`, `product`, `purchases`
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
  FROM (select country, product, count(*) as `purchases` from purchases group by country, product))
WHERE row_num <= 3;

上面基本上有2个计算:

  1. 按国家和产品分组的购买(select country, product, count(*) as 购买 from purchases group by country, product
  2. top-n 聚合每个国家/地区排名前 3 的产品。

我想知道 purchases table 中的每一行都发生了什么:Flink 是否重新计算所有国家和产品的每个分组,并重新计算每个国家的前 3 名产品 或者 Flink 是否足够聪明,只重新计算新记录所属的组,也足够聪明,只为产品所属的组重新计算前 3 行?

我认为 Flink 在这两种情况下都足够智能,因为重新计算所有行的开销会非常高,但是我在文档中找不到关于此的任何明确信息。

Flink 使用状态来避免重新计算,而是增量计算所需的结果。

select country, product, count(*) as `purchases`
from purchases
group by country, product

Flink 将为每个不同的 country/product 对维护一个计数器,并为每个传入事件增加适当的计数器。

在您的查询中使用 EXPLAIN 揭示了优化的执行计划的样子:

Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[country], orderBy=[purchases DESC], select=[country, product, purchases])
+- Exchange(distribution=[hash[country]])
   +- GroupAggregate(groupBy=[country, product], select=[country, product, COUNT(*) AS purchases])
      +- Exchange(distribution=[hash[country, product]])
         +- TableSourceScan(table=[[default_catalog, default_database, purchases]], fields=[country, product])

这告诉我们

  • 输入是仅附加流(而不是更新的 CDC 流)
  • 该流然后在(国家、产品)上进行散列分区
  • 然后使用 GroupAggregate 运算符来实现分区计数
  • GroupAggregate 运算符生成更新流,然后按国家/地区重新哈希分区
  • Rank 运算符使用该更新计数流,并使用 UpdateFastStrategy 生成最终结果(此策略利用了知道传入的更新流将不包含任何删除,并且更新将单调递增)

一般来说,如果您希望减少保存的状态量,并且可以接受结果不完全准确的风险,您可以 configure Flink's Table/SQL API to expire idle state。但我不知道这将如何影响 Rank 运算符。