我可以在聚合运行之前加入由 Kafka Streams #aggregate 调用生成的 KTable 吗?
Can I join the KTable produced by a Kafka Streams #aggregate call before the aggregation runs?
我有许多 IOT 设备通过消息向 kafka 主题报告事件,并且我定义了一个聚合器来从这些事件更新设备状态。
我想做的是能够在聚合更新状态之前将输入流连接到聚合器输出的 KTable——也就是说,我想将一个事件与当前的事件进行比较状态,如果它们匹配某个谓词,则进行一些处理,然后更新状态。
我试过先用 StreamsBuilder#addStateStore
创建状态存储,但是那个方法 returns 是一个 StreamsBuilder,似乎没有给我提供将它变成 KTable 的方法。
我已经尝试将输入流与 StreamsBuilder#aggregate
生成的 KTable 相结合,但这并没有达到我想要的效果,因为它只在聚合具有 运行,我希望它在聚合之前 运行。
// this is fine, but it returns a StreamsBuilder and I don't see how to get a KTable out of it
streamsBuilder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(deviceStateAggregator),
Serdes.String(),
Serdes.String()
)
);
// this doesn't work because I only get doThingsBeforeStateUpdate called after the state is updated by the DeviceStateAggregator
KTable<String, DeviceState> deviceTable = deviceEventKStream
.groupByKey(Serialized.with(Serdes.String(), new deviceEventSerde()))
.aggregate(
() -> null,
new DeviceStateAggregator(),
Materialized.<String, DeviceState>as(stateStoreSupplier)
.withValueSerde(deviceStateSerde)
);
deviceEventKStream.join(deviceTable, (event, state) -> doThingsBeforeStateUpdate(event, state));
我希望能够在聚合器更新状态之前利用 Streams DSL 检查一些先决条件,但这似乎不可能。我目前正在探索使用处理器的想法,或者可能只是扩展我的 DeviceStateAggregator 来完成所有预聚合处理,但这对我来说感觉很尴尬,因为它迫使聚合关心那些看起来不关心的问题作为聚合的一部分是合理的。
that is, I want to, say, compare an event to the current state, and if they match a certain predicate, do some processing, and then update the state.
如果我正确理解您的问题,尤其是这句话,那么我会按照您的想法使用处理器 API 来实现它。您将需要实现一个 Transformer
(因为您希望它输出数据,而不仅仅是读取数据)。
作为您可以作为起点的示例应用程序,我建议您查看 MixAndMatch DSL + Processor API
和 CustomStreamTableJoin
中的示例 https://github.com/confluentinc/kafka-streams-examples。第二个示例展示了,尽管针对不同的用例,如何在处理处理器 API 中的状态时执行自定义 "if this then that" 逻辑,此外它还涵盖了连接功能,这是您想要做的事情,也是。
希望对您有所帮助!
我有许多 IOT 设备通过消息向 kafka 主题报告事件,并且我定义了一个聚合器来从这些事件更新设备状态。
我想做的是能够在聚合更新状态之前将输入流连接到聚合器输出的 KTable——也就是说,我想将一个事件与当前的事件进行比较状态,如果它们匹配某个谓词,则进行一些处理,然后更新状态。
我试过先用 StreamsBuilder#addStateStore
创建状态存储,但是那个方法 returns 是一个 StreamsBuilder,似乎没有给我提供将它变成 KTable 的方法。
我已经尝试将输入流与 StreamsBuilder#aggregate
生成的 KTable 相结合,但这并没有达到我想要的效果,因为它只在聚合具有 运行,我希望它在聚合之前 运行。
// this is fine, but it returns a StreamsBuilder and I don't see how to get a KTable out of it
streamsBuilder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(deviceStateAggregator),
Serdes.String(),
Serdes.String()
)
);
// this doesn't work because I only get doThingsBeforeStateUpdate called after the state is updated by the DeviceStateAggregator
KTable<String, DeviceState> deviceTable = deviceEventKStream
.groupByKey(Serialized.with(Serdes.String(), new deviceEventSerde()))
.aggregate(
() -> null,
new DeviceStateAggregator(),
Materialized.<String, DeviceState>as(stateStoreSupplier)
.withValueSerde(deviceStateSerde)
);
deviceEventKStream.join(deviceTable, (event, state) -> doThingsBeforeStateUpdate(event, state));
我希望能够在聚合器更新状态之前利用 Streams DSL 检查一些先决条件,但这似乎不可能。我目前正在探索使用处理器的想法,或者可能只是扩展我的 DeviceStateAggregator 来完成所有预聚合处理,但这对我来说感觉很尴尬,因为它迫使聚合关心那些看起来不关心的问题作为聚合的一部分是合理的。
that is, I want to, say, compare an event to the current state, and if they match a certain predicate, do some processing, and then update the state.
如果我正确理解您的问题,尤其是这句话,那么我会按照您的想法使用处理器 API 来实现它。您将需要实现一个 Transformer
(因为您希望它输出数据,而不仅仅是读取数据)。
作为您可以作为起点的示例应用程序,我建议您查看 MixAndMatch DSL + Processor API
和 CustomStreamTableJoin
中的示例 https://github.com/confluentinc/kafka-streams-examples。第二个示例展示了,尽管针对不同的用例,如何在处理处理器 API 中的状态时执行自定义 "if this then that" 逻辑,此外它还涵盖了连接功能,这是您想要做的事情,也是。
希望对您有所帮助!