用于添加全局存储的 Kafka 流用例

Kafka streams use cases for add global store

在kafka streams中定义拓扑时,可以添加全局状态存储。它将需要一个源主题以及一个 ProcessorSupplier。 处理器接收记录,理论上可以在将它们添加到商店之前对其进行转换。但是在恢复的情况下,记录直接从源主题(变更日志)插入到全局状态存储中,跳过在处理器中完成的最终转换。

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder, String topic, Consumed consumed, ProcessorSupplier stateUpdateSupplier) 将全局 StateStore 添加到拓扑中。

根据文档

NOTE: you should not use the Processor to insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. This ProcessorNode should be used to keep the StateStore up-to-date.

与此同时,kafka 错误跟踪器上目前已打开主要错误:KAFKA-7663 从主题 恢复状态时不使用 addGlobalStore 上提供的自定义处理器,这准确解释了文档中所述的内容,但似乎是一个可接受的错误。

我想知道 KAFKA-7663 是否确实是一个错误。根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例。
有人可以解释这个低级别 API 的主要用例吗?我唯一能想到的就是处理副作用,比如在处理器中做一些日志操作。

额外问题:如果源主题充当全局存储的变更日志,当一条记录因为保留期已过而从主题中删除时,它是否会从全局状态存储中删除?还是仅在从更新日志完全恢复商店后才在商店中进行删除。

是的,这是一个很奇怪的小 catch-22,但文档是正确的。全局状态存储的处理器不得对记录执行任何操作,但必须将它们保存到存储中。

AFAIK,这不是一个哲学问题,只是一个实际问题。原因很简单,就是您观察到的行为......Streams 将输入主题视为商店的变更日志主题,因此在恢复期间绕过处理器(以及反序列化)。

状态恢复绕过任何处理的原因是通常更改日志中的数据与存储中的数据相同,所以做任何新的事情实际上都是错误的给它。另外,将字节从线路中取出并将它们批量写入状态存储会更有效。我说 "usually" 是因为在这种情况下,输入主题与普通的变更日志主题不完全一样,因为它不会在存储放置期间接收到它的写入。

对于它的价值,我也很难理解用例。看起来,我们应该:

  1. 完全摆脱那个处理器,总是将二进制数据从网络上转储到存储中,就像恢复一样。
  2. 重新设计全局存储以允许在全局存储之前进行任意转换。我们可以:
    • 继续使用输入主题并在恢复期间反序列化和调用处理器,或者
    • 为全局存储添加一个真实更新日志,这样我们就可以轮询输入主题,应用一些转换,然后写入全局存储 全局商店变更日志。然后,我们可以使用更改日志(不是输入)进行恢复和复制。

顺便说一下,如果您想要后者的行为,您现在可以通过应用您的转换然后使用 to(my-global-changelog) 来制造一个 "changelog" 主题来近似它。然后,您将创建全局存储以从您的 my-global-changelog 而不是输入中读取。

所以,直接给你答案,KAFKA-7663 不是 bug。我将对提议将其转变为功能请求的票发表评论。

奖励答案:充当状态存储变更日志的主题不得配置保留。实际上,这意味着您应该通过启用压缩和禁用日志保留来防止无限增长。

实际上,旧数据失去保留并被丢弃并不是 "event",消费者无法知道 if/when 它发生了。因此,不可能从状态存储中删除数据以响应此非事件。它会像您描述的那样发生……这些记录将无限期地存放在全球商店中。 If/when 一个实例被替换,新实例将从输入中恢复并且(显然)只接收当时存在于主题中的记录。因此,Streams 集群作为一个整体将以不一致的全局状态视图告终。这就是为什么你应该禁用保留。

从商店中 "drop" 旧数据的正确方法是将所需键的墓碑写入输入主题。然后这将正确传播到集群的所有成员,在恢复期间正确应用,并由代理正确压缩。

希望以上内容对您有所帮助。当然,请在票上插话,帮助我们塑造 API 更直观!

目前似乎没有办法监听 KGlobalTable 的变化。

您可以使用全局存储和自定义处理器实现类似的结果。

我在这里偶然发现了这个

我并不是说这是一个 好的 用例,但作为一种解决方法它可能会有所帮助。