Flink 是如何清理 Keyed State 的?

How Does Flink Clean Up Keyed State?

在考虑通过某个东西进行键控的行为时,我通常会想到将所有与键匹配的事件都扔到同一个桶中的类比。可以想象,当 Flink 应用程序开始处理大量数据时,您选择作为键值的内容开始变得很重要,因为您希望确保很好地清理状态。这引出了我的问题,Flink 究竟是如何清理这些 "buckets" 的?如果桶是空的(所有 MapStates 和 ValueStates 都是空的)Flink 是否关闭键的那个区域 space 并删除桶?

示例:

传入数据格式: {userId, computerId, amountOfTimeLoggedOn}

键: UserId/ComputerId

当前密钥Space:

Flink 会最终将 Bob,Computer 11 从密钥中移除 Space 还是它会永远存在,因为它曾经有过一个事件?

Flink 不会为没有任何用户值关联的状态键存储任何数据,至少在现有的状态后端:堆(在内存中)或 RocksDB 中是这样。

Key Space 在 Flink 中是虚拟的,Flink 不对可能存在哪些具体的 key 做任何假设。每个键或键的子集没有任何预分配的桶。只有当用户应用程序为某个键写入一些值时,它才会占用存储空间。

一般的想法是所有具有相同键的记录都在同一台机器上处理(有点像你说的在同一个桶中)。某个键的本地状态也始终保存在同一台机器上(如果有的话)。不过这与检查点无关。

对于您的示例,如果在某个时间点为 [Bob,Computer 11] 写入了一些值,然后随后将其删除,Flink 将使用密钥将其完全删除。

简答

它在生存时间 (TTL) feature of Flink State and Java Garbage Collector (GC) 的帮助下进行清理。 TTL 功能将删除对状态条目的任何引用,GC 将收回分配的内存。

长答案

你的问题可以分为3个子问题:

我会尽量简短。

Flink是如何根据Key对数据进行分区的?

对于键控流上的操作符,Flink 在 Consistent Hashing Algorithm 的帮助下根据键对数据进行分区。它创建 max_parallelism 个桶。每个运算符实例都分配了一个或多个这些桶。每当要向下游发送数据时,都会将密钥分配给其中一个存储桶,然后将其发送给相关的运算符实例。 这里没有存储键,因为范围是通过数学计算的。因此任何时候都不会清除任何区域或删除桶。您可以创建任何类型的密钥。它不会影响键空间或范围方面的内存。

Flink如何用Key存储状态?

所有运算符实例都有一个实例级状态存储。该存储定义了该运算符实例的状态上下文,并且它可以存储多个命名状态存储,例如“count”、“sum”、“some-name”等。这些命名状态存储是键值存储,可以根据数据的键存储值。

这些 KV 存储是在我们使用运算符的 open() 函数中的状态描述符初始化状态时创建的。即 getRuntimeContext().getValueState().

这些 KV 存储仅在需要在状态中存储某些内容时才存储数据。 (比如 HashMap.put(k,v))。 因此,除非调用状态更新方法(如 updateaddput),否则不会存储键或值。

所以,

  • 如果 Flink 没有看到密钥,则不会为该密钥存储任何内容。
  • 如果 Flink 已经看到密钥但没有调用状态更新方法,则不会为该密钥存储任何内容。
  • 如果为键调用状态更新方法,则键值对将存储在KV存储中。

Flink如何清理Key的状态?

除非用户要求或用户手动完成,否则Flink不会删除状态。前面说了,Flink 有状态的 TTL 特性。此 TTL 将标记状态到期并在调用清理策略时将其删除。这些清理策略因后端类型和清理时间而异。对于堆状态后端,它将从状态 table 中删除条目,即删除对该条目的任何引用。这个未被引用的条目占用的内存将被 Java GC 清理。对于RocksDB State Backend,它只是调用RocksDB原生的delete方法。