构建一个 Kafka Stream,将 returns 不同 ID 的列表放入时间间隔
Build a Kafka Stream that returns the list of distinct ids into time interval
我有一个 kafka 对象事件流:
KStream<String, VehicleEventTO> stream = builder.stream("mytopic", Consumed.with(Serdes.String(), new JsonSerde<>(MyObjectEvent.class)));
每个 ObjectEvent
都有一个 属性 idType
(长)。我需要构建一个 returns 不同 idTypes
的流到时间间隔(例如:10 分钟)。
有可能,使用 KafkaStream DSL?我没有找到解决方案。
我不太了解KafkaStream
的API,但关于一般流媒体api,
你有一个方法可以随着时间的推移缓冲消息(如 buffer
、groupedWithin
或类似的东西),你可以在其中指定时间(and/or 最大消息)。
然后你的流会是这样的:
KStream stream = builder.stream("mytopic", Consumed.with(Serdes.String(), new JsonSerde<>(MyObjectEvent.class)))
.map(record -> record.value().getId()) // assuming you get a stream of records, I don't know the KafkaStreams api
.groupedWithin(Duration.ofMinutes(10)) // <-- pseudocode, search for correct method
然后你会得到一个流,其中包含随时间变化的 ID。
根据您的用例,您正在寻找窗口聚合。 Kafka streams DSL有TimeWindowedKStream或SessionWindowdKStream应该可以解决你的问题。
我有一个 kafka 对象事件流:
KStream<String, VehicleEventTO> stream = builder.stream("mytopic", Consumed.with(Serdes.String(), new JsonSerde<>(MyObjectEvent.class)));
每个 ObjectEvent
都有一个 属性 idType
(长)。我需要构建一个 returns 不同 idTypes
的流到时间间隔(例如:10 分钟)。
有可能,使用 KafkaStream DSL?我没有找到解决方案。
我不太了解KafkaStream
的API,但关于一般流媒体api,
你有一个方法可以随着时间的推移缓冲消息(如 buffer
、groupedWithin
或类似的东西),你可以在其中指定时间(and/or 最大消息)。
然后你的流会是这样的:
KStream stream = builder.stream("mytopic", Consumed.with(Serdes.String(), new JsonSerde<>(MyObjectEvent.class)))
.map(record -> record.value().getId()) // assuming you get a stream of records, I don't know the KafkaStreams api
.groupedWithin(Duration.ofMinutes(10)) // <-- pseudocode, search for correct method
然后你会得到一个流,其中包含随时间变化的 ID。
根据您的用例,您正在寻找窗口聚合。 Kafka streams DSL有TimeWindowedKStream或SessionWindowdKStream应该可以解决你的问题。