用 KStream 语义重组
regrouping with KStream semantics
使用 kafka-streams,我想通过某个键 K1
将元素 E
的流 S
分组,同时将同一键的所有值聚合到连接结果中 AGG
。这导致 KTable T1
。
根据聚合结果,该值应重新分区到另一个 KTable T2
,按从聚合结果 AGG
中获取的键 K2
分组。所以聚合结果应该为下一次重组生成密钥。
最后我只对一个KTableT2
感兴趣,其中key是K2
,value是AGG
但是,这不起作用。我只得到一个 KTable T
作为最后一个值。不是每个键的值 K2
我知道聚合的结果在一段时间后才会转发,所以我已经尝试将 commit.interval.ms
降低到 1 但无济于事。
我也尝试使用 through
并将中间结果写入流,但也没有成功。
val finalTable = streamBuilder.kstream("streamS")
.groupBy{ k, v -> createKey1(k, v) }
.aggregate(
{ Agg.empty() },
{ k, v, previousAgg ->
Agg.merge(previousAgg, v)
})
.toStream()
// .through("table1")
.groupBy { k1, agg -> agg.createKey2()}
.reduce{ _, agg -> agg }
对于包含以下值的流 S
:
key1="123", id="1", startNewGroup="false"
key1="234", id="2", startNewGroup="false"
key1="123", id="3", startNewGroup="false"
key1="123", id="4", startNewGroup="true"
key1="234", id="5", startNewGroup="false"
key1="123", id="6", startNewGroup="false"
key1="123", id="7", startNewGroup="false"
key1="123", id="8", startNewGroup="true"
我希望最终结果是具有以下最新键值的 KTable:
key: 123-1, value: 'key1="123", key2="123-1", ids="1,3"'
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-4, value: 'key1="123", key2="123-4", ids="4,6,7"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
元素的原始流 S
首先按 key1
分组,其中聚合结果包含 groupby 键 key1
并添加一个包含组合的额外字段 key2
key1
与第一次出现的 id
。
每当聚合收到 startNewGroup
设置为 true
的值时,它 returns 聚合 key2
字段设置为 key1
和 id
的新值,有效地创建了一个新的子组。
在第二次重组中,我们按 key2
字段进行简单分组。
然而我们真正观察到的是:
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
对于您的用例,最好使用 Processor API. Processor API can be easily combine with Kafka Streams DSL (Processor API integration)。
您必须实施自定义 Transformer,它将针对状态存储中的特定键聚合您的消息。当 startNewGroup=true
消息到达时 old key 的消息将转发到下游并且 new 聚合将开始
您的 Transformer 示例可能如下所示:
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore
case class CustomTransformer(storeName: String) extends Transformer[String, Value, Agg] {
private var stateStore: KeyValueStore[String, Agg] = null
private var context: ProcessorContext = null
override def init(context: ProcessorContext): Unit = {
this.context = context
stateStore = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, Agg]]
}
override def transform(key: String, value: Value): Agg = {
val maybeAgg = Option(stateStore.get(key))
if (value.startNewGroup) {
maybeAgg.foreach(context.forward(key, _))
stateStore.put(key, Agg(value))
}
else
stateStore.put(key, maybeAgg.map(_.merge(value)).getOrElse(Agg(value)))
null
}
override def close(): Unit = {}
}
使用 kafka-streams,我想通过某个键 K1
将元素 E
的流 S
分组,同时将同一键的所有值聚合到连接结果中 AGG
。这导致 KTable T1
。
根据聚合结果,该值应重新分区到另一个 KTable T2
,按从聚合结果 AGG
中获取的键 K2
分组。所以聚合结果应该为下一次重组生成密钥。
最后我只对一个KTableT2
感兴趣,其中key是K2
,value是AGG
但是,这不起作用。我只得到一个 KTable T
作为最后一个值。不是每个键的值 K2
我知道聚合的结果在一段时间后才会转发,所以我已经尝试将 commit.interval.ms
降低到 1 但无济于事。
我也尝试使用 through
并将中间结果写入流,但也没有成功。
val finalTable = streamBuilder.kstream("streamS")
.groupBy{ k, v -> createKey1(k, v) }
.aggregate(
{ Agg.empty() },
{ k, v, previousAgg ->
Agg.merge(previousAgg, v)
})
.toStream()
// .through("table1")
.groupBy { k1, agg -> agg.createKey2()}
.reduce{ _, agg -> agg }
对于包含以下值的流 S
:
key1="123", id="1", startNewGroup="false"
key1="234", id="2", startNewGroup="false"
key1="123", id="3", startNewGroup="false"
key1="123", id="4", startNewGroup="true"
key1="234", id="5", startNewGroup="false"
key1="123", id="6", startNewGroup="false"
key1="123", id="7", startNewGroup="false"
key1="123", id="8", startNewGroup="true"
我希望最终结果是具有以下最新键值的 KTable:
key: 123-1, value: 'key1="123", key2="123-1", ids="1,3"'
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-4, value: 'key1="123", key2="123-4", ids="4,6,7"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
元素的原始流 S
首先按 key1
分组,其中聚合结果包含 groupby 键 key1
并添加一个包含组合的额外字段 key2
key1
与第一次出现的 id
。
每当聚合收到 startNewGroup
设置为 true
的值时,它 returns 聚合 key2
字段设置为 key1
和 id
的新值,有效地创建了一个新的子组。
在第二次重组中,我们按 key2
字段进行简单分组。
然而我们真正观察到的是:
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'
对于您的用例,最好使用 Processor API. Processor API can be easily combine with Kafka Streams DSL (Processor API integration)。
您必须实施自定义 Transformer,它将针对状态存储中的特定键聚合您的消息。当 startNewGroup=true
消息到达时 old key 的消息将转发到下游并且 new 聚合将开始
您的 Transformer 示例可能如下所示:
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore
case class CustomTransformer(storeName: String) extends Transformer[String, Value, Agg] {
private var stateStore: KeyValueStore[String, Agg] = null
private var context: ProcessorContext = null
override def init(context: ProcessorContext): Unit = {
this.context = context
stateStore = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, Agg]]
}
override def transform(key: String, value: Value): Agg = {
val maybeAgg = Option(stateStore.get(key))
if (value.startNewGroup) {
maybeAgg.foreach(context.forward(key, _))
stateStore.put(key, Agg(value))
}
else
stateStore.put(key, maybeAgg.map(_.merge(value)).getOrElse(Agg(value)))
null
}
override def close(): Unit = {}
}