Kafka 流 DSL:聚合、丰富和发送
Kafka streams DSL: aggregate, enrich and send through
我们有以下问题需要用 Kafka Streams 解决:
1- 收到消息。每条消息都标有 eventId(消息更新事件)和 correlationId(每条消息唯一)。
2- 从该消息中聚合一些状态(基于 eventId)并将其附加到本地存储中已有的状态
3- 丰富该消息以获得该事件的完整聚合状态,并将其发送到输出主题
重点是我们不能真正丢失一条消息,它必须始终使用最新的聚合状态(我们在消息处理期间实际评估)来丰富传入消息。
据我目前所见,我们不能只使用简单的聚合(类似的东西:)
stateMessageStream
.map((k, v) => new KeyValue[String, StateMessage](k, v))
.mapValues[StateMessageWithMarkets](sm => {StateMessageWithMarkets(Some(sm), extract(sm))})
.groupBy((k, _) => k, stringSerde, marketAggregatorSerde)
.aggregate[StateMessageWithMarkets](() => StateMessageWithMarkets(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName)
.to(stringSerde, marketAggregatorSerde, kafkaOutTopic)
因为聚合仅按时间间隔生成新记录,这意味着对于两条传入消息,我们可能只生成一条聚合输出消息(因此我们丢失了一条消息)
我第二次尝试实现它基本上是两个流,一个用于聚合,第二个用于纯消息。最后,我们可以使用连接操作将两个流重新连接在一起,基于 correlationId 作为键 - 我们可以将正确的状态与正确的消息相匹配:
val aggregatedStream : KStream[String, MarketAggregator] = stateMessageStream
.map((k, v) => new KeyValue[String, StateMessage](k, v))
.mapValues[StateMessage](v => {
log.debug("Received State Message, gameId: " + v.metadata().gtpId() + ", correlationId: " + v.correlationId)
v})
.mapValues[MarketAggregator](sm => {MarketAggregator(sm.correlationId, extract(sm))})
.groupBy((k, v) => k, stringSerde, marketAggregatorSerde)
.aggregate[MarketAggregator](() => MarketAggregator(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName)
.toStream((k, v) => v.correlationId)
stateMessageStream
.selectKey[String]((k, v) => v.correlationId)
.leftJoin[MarketAggregator, StateMessageWithMarkets](aggregatedStream, (stateMessage : StateMessage, aggregatedState : MarketAggregator) => StateMessageWithMarkets(Some(stateMessage), aggregatedState.modelMarkets, stateMessage.correlationId),
JoinWindows.of(10000),
stringSerde, stateMessageSerde, marketAggregatorSerde)
.mapValues[StateMessageWithMarkets](v => {
log.debug("Producing aggregated State Message, gameId: " + v.stateMessage.map(_.metadata().gtpId()).getOrElse("unknown") +
", correlationId: " + v.stateMessage.map(_.correlationId).getOrElse("unknown"))
v
})
.to(stringSerde, stateMessageWithMarketsSerde, kafkaOutTopic)
但是,这似乎也不起作用 - 对于两条传入消息,我仍然只收到一条消息,其中包含关于输出主题的最新聚合状态。
有人可以解释为什么以及正确的解决方案是什么吗?
您可以使用方法一并通过禁用缓存为每个输入消息获取输出消息。在您的 StreamsConfig
中,您只需将 StreamConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG
的值设置为零。
有关详细信息,请参阅 http://docs.confluent.io/current/streams/developer-guide.html#memory-management
我们有以下问题需要用 Kafka Streams 解决:
1- 收到消息。每条消息都标有 eventId(消息更新事件)和 correlationId(每条消息唯一)。
2- 从该消息中聚合一些状态(基于 eventId)并将其附加到本地存储中已有的状态
3- 丰富该消息以获得该事件的完整聚合状态,并将其发送到输出主题
重点是我们不能真正丢失一条消息,它必须始终使用最新的聚合状态(我们在消息处理期间实际评估)来丰富传入消息。
据我目前所见,我们不能只使用简单的聚合(类似的东西:)
stateMessageStream
.map((k, v) => new KeyValue[String, StateMessage](k, v))
.mapValues[StateMessageWithMarkets](sm => {StateMessageWithMarkets(Some(sm), extract(sm))})
.groupBy((k, _) => k, stringSerde, marketAggregatorSerde)
.aggregate[StateMessageWithMarkets](() => StateMessageWithMarkets(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName)
.to(stringSerde, marketAggregatorSerde, kafkaOutTopic)
因为聚合仅按时间间隔生成新记录,这意味着对于两条传入消息,我们可能只生成一条聚合输出消息(因此我们丢失了一条消息)
我第二次尝试实现它基本上是两个流,一个用于聚合,第二个用于纯消息。最后,我们可以使用连接操作将两个流重新连接在一起,基于 correlationId 作为键 - 我们可以将正确的状态与正确的消息相匹配:
val aggregatedStream : KStream[String, MarketAggregator] = stateMessageStream
.map((k, v) => new KeyValue[String, StateMessage](k, v))
.mapValues[StateMessage](v => {
log.debug("Received State Message, gameId: " + v.metadata().gtpId() + ", correlationId: " + v.correlationId)
v})
.mapValues[MarketAggregator](sm => {MarketAggregator(sm.correlationId, extract(sm))})
.groupBy((k, v) => k, stringSerde, marketAggregatorSerde)
.aggregate[MarketAggregator](() => MarketAggregator(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName)
.toStream((k, v) => v.correlationId)
stateMessageStream
.selectKey[String]((k, v) => v.correlationId)
.leftJoin[MarketAggregator, StateMessageWithMarkets](aggregatedStream, (stateMessage : StateMessage, aggregatedState : MarketAggregator) => StateMessageWithMarkets(Some(stateMessage), aggregatedState.modelMarkets, stateMessage.correlationId),
JoinWindows.of(10000),
stringSerde, stateMessageSerde, marketAggregatorSerde)
.mapValues[StateMessageWithMarkets](v => {
log.debug("Producing aggregated State Message, gameId: " + v.stateMessage.map(_.metadata().gtpId()).getOrElse("unknown") +
", correlationId: " + v.stateMessage.map(_.correlationId).getOrElse("unknown"))
v
})
.to(stringSerde, stateMessageWithMarketsSerde, kafkaOutTopic)
但是,这似乎也不起作用 - 对于两条传入消息,我仍然只收到一条消息,其中包含关于输出主题的最新聚合状态。
有人可以解释为什么以及正确的解决方案是什么吗?
您可以使用方法一并通过禁用缓存为每个输入消息获取输出消息。在您的 StreamsConfig
中,您只需将 StreamConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG
的值设置为零。
有关详细信息,请参阅 http://docs.confluent.io/current/streams/developer-guide.html#memory-management