Kafka Streams - 添加新源流后使用现有状态存储

Kafka Streams - Using An Existing State Store After Adding a New Source Stream

我有一个现有的流,它使用两个主题作为其来源:

val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")

stream1
  .merge(stream2)
  .groupByKey
  .reduce(reduceValues)
  .toStream
  .to("result-topic")

StateStore 的自动生成名称是 KSTREAM-REDUCE-STATE-STORE-0000000003

现在我需要再添加一个主题作为来源。但是,添加新源会增加 a kafka-internal number,导致 StateStore 变为 KSTREAM-REDUCE-STATE-STORE-0000000005。我不想失去当前状态,所以我明确提供了旧的名称 StateStore:

val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic

stream1
  .merge(stream2)
  .merge(stream3) // merge new topic
  .groupByKey
  .reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
  .toStream
  .to("result-topic")

它似乎有效,但我不确定我是否在干扰 Kafka 内部,因为:

  1. 我正在使用 Kafka 自动生成的自定义名称(名称冲突的可能性?)
  2. 用于提供此 StateStore 的流集与最初不同。

有意见吗?

老实说,最安全的选择是为此状态添加人类可读的名称,但正如您提到的,您将失去它。

我认为您所做的应该没有任何问题(至少在您引入另一个代码更改之前:))。 ID 0000000003 将被分配给 groupByKey 运算符,因此不会有任何冲突(尽管我不是 100% 确定那里的 Kafka Streams 内部结构)。

还有 Application Reset Tool 允许您重新生成聚合。但我不知道它是否适用于您的情况:您对输入主题的保留政策可能会阻止此工具重新生成精确的聚合。