状态存储分区迭代器?
State store partitioned iterator?
我有一个 Kafka-streams 转换器,它的功能类似于窗口程序:它在 transform()
中将状态累积到状态存储中,然后在 punctuate()
期间将其转发到输出主题中,并使用状态存储主题分区键与输入主题相同。
在 punctuate()
期间,我希望每个 StreamThread 只迭代它自己的状态存储分区,以最大限度地减少从支持 kafka 主题读取的数据量。但是我唯一能得到的迭代器是通过
org.apache.kafka.streams.state.ReadOnlyKeyValueStore<K,V>.all()
遍历整个状态存储。
有什么方法可以 "assign partitions" 状态存储并使 punctuate()
仅在分配的分区上迭代?
我想,ReadOnlyKeyValueStore<K,V>.all()
会如您所愿。请注意,整体状态被分成多个存储,每个分区一个 shard/store。 all()
不会遍历 "other shards"。 "all" 表示 "everything local",即单个分区的分片中的所有内容。
我有一个 Kafka-streams 转换器,它的功能类似于窗口程序:它在 transform()
中将状态累积到状态存储中,然后在 punctuate()
期间将其转发到输出主题中,并使用状态存储主题分区键与输入主题相同。
在 punctuate()
期间,我希望每个 StreamThread 只迭代它自己的状态存储分区,以最大限度地减少从支持 kafka 主题读取的数据量。但是我唯一能得到的迭代器是通过
org.apache.kafka.streams.state.ReadOnlyKeyValueStore<K,V>.all()
遍历整个状态存储。
有什么方法可以 "assign partitions" 状态存储并使 punctuate()
仅在分配的分区上迭代?
我想,ReadOnlyKeyValueStore<K,V>.all()
会如您所愿。请注意,整体状态被分成多个存储,每个分区一个 shard/store。 all()
不会遍历 "other shards"。 "all" 表示 "everything local",即单个分区的分片中的所有内容。