Kafka Streams - 添加新源流后使用现有状态存储
Kafka Streams - Using An Existing State Store After Adding a New Source Stream
我有一个现有的流,它使用两个主题作为其来源:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
stream1
.merge(stream2)
.groupByKey
.reduce(reduceValues)
.toStream
.to("result-topic")
StateStore
的自动生成名称是 KSTREAM-REDUCE-STATE-STORE-0000000003
。
现在我需要再添加一个主题作为来源。但是,添加新源会增加 a kafka-internal number,导致 StateStore
变为 KSTREAM-REDUCE-STATE-STORE-0000000005
。我不想失去当前状态,所以我明确提供了旧的名称 StateStore
:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic
stream1
.merge(stream2)
.merge(stream3) // merge new topic
.groupByKey
.reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
.toStream
.to("result-topic")
它似乎有效,但我不确定我是否在干扰 Kafka 内部,因为:
- 我正在使用 Kafka 自动生成的自定义名称(名称冲突的可能性?)
- 用于提供此
StateStore
的流集与最初不同。
有意见吗?
老实说,最安全的选择是为此状态添加人类可读的名称,但正如您提到的,您将失去它。
我认为您所做的应该没有任何问题(至少在您引入另一个代码更改之前:))。 ID 0000000003
将被分配给 groupByKey
运算符,因此不会有任何冲突(尽管我不是 100% 确定那里的 Kafka Streams 内部结构)。
还有 Application Reset Tool 允许您重新生成聚合。但我不知道它是否适用于您的情况:您对输入主题的保留政策可能会阻止此工具重新生成精确的聚合。
我有一个现有的流,它使用两个主题作为其来源:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
stream1
.merge(stream2)
.groupByKey
.reduce(reduceValues)
.toStream
.to("result-topic")
StateStore
的自动生成名称是 KSTREAM-REDUCE-STATE-STORE-0000000003
。
现在我需要再添加一个主题作为来源。但是,添加新源会增加 a kafka-internal number,导致 StateStore
变为 KSTREAM-REDUCE-STATE-STORE-0000000005
。我不想失去当前状态,所以我明确提供了旧的名称 StateStore
:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic
stream1
.merge(stream2)
.merge(stream3) // merge new topic
.groupByKey
.reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
.toStream
.to("result-topic")
它似乎有效,但我不确定我是否在干扰 Kafka 内部,因为:
- 我正在使用 Kafka 自动生成的自定义名称(名称冲突的可能性?)
- 用于提供此
StateStore
的流集与最初不同。
有意见吗?
老实说,最安全的选择是为此状态添加人类可读的名称,但正如您提到的,您将失去它。
我认为您所做的应该没有任何问题(至少在您引入另一个代码更改之前:))。 ID 0000000003
将被分配给 groupByKey
运算符,因此不会有任何冲突(尽管我不是 100% 确定那里的 Kafka Streams 内部结构)。
还有 Application Reset Tool 允许您重新生成聚合。但我不知道它是否适用于您的情况:您对输入主题的保留政策可能会阻止此工具重新生成精确的聚合。