窗口组后的Kafka流和数据
Kafka stream sum data after windowed group
我有一个仓库应用程序,我需要在其中按小时计算总库存。
所有项目移动数据都发送到kafka流(添加/删除)。
这意味着,我可以使用 windowed kafka 流获得每小时的汇总运动,就像这样
sourceStream
.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
.reduce(Long::sum, Materialized.with(stringSerde, longSerde)).toStream().to("hourly-movement");
但是如何根据这个聚合结果得到总库存?
例如,对于这个数据集,假设起始库存为零:
- 09:15 : +50 项
- 09:20 : +10 项
- 09:50 : +10 项
- 10:35 : -40 项
- 10:55 : -20 项
聚合流结果(window)是这样的:
- 项目@09:00/10:00 : 70
- 项目@10:00/11:00 : -60
我需要在前端创建小时图表,意味着我需要这个数据集:
- item@09:00/10:00 : 70 (初始+一小时移动)
- item@10:00/11:00 : 10(10:00 的项目 + 下一小时的移动,即 70 - 60)
我怎样才能得到这样的数据集?原始源码流来自stream-logistic-movement
.
阅读不同类型的窗口技术可能会有用。在您的情况下, sliding-time-windows 可能是解决方案。在此处检查备选方案:https://kafka.apache.org/25/documentation/streams/developer-guide/dsl-api.html#windowing
您似乎不想进行 窗口化 聚合,而是整体聚合但每小时发出当前结果。
因此,您根本不应使用 windowBy()
,而应使用 "regular" 非窗口聚合。聚合后,您可以使用 suppress()
定期发出结果:https://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/kstream/Suppressed.html#untilTimeLimit-java.time.Duration-org.apache.kafka.streams.kstream.Suppressed.BufferConfig-
我有一个仓库应用程序,我需要在其中按小时计算总库存。
所有项目移动数据都发送到kafka流(添加/删除)。
这意味着,我可以使用 windowed kafka 流获得每小时的汇总运动,就像这样
sourceStream
.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
.reduce(Long::sum, Materialized.with(stringSerde, longSerde)).toStream().to("hourly-movement");
但是如何根据这个聚合结果得到总库存?
例如,对于这个数据集,假设起始库存为零:
- 09:15 : +50 项
- 09:20 : +10 项
- 09:50 : +10 项
- 10:35 : -40 项
- 10:55 : -20 项
聚合流结果(window)是这样的:
- 项目@09:00/10:00 : 70
- 项目@10:00/11:00 : -60
我需要在前端创建小时图表,意味着我需要这个数据集:
- item@09:00/10:00 : 70 (初始+一小时移动)
- item@10:00/11:00 : 10(10:00 的项目 + 下一小时的移动,即 70 - 60)
我怎样才能得到这样的数据集?原始源码流来自stream-logistic-movement
.
阅读不同类型的窗口技术可能会有用。在您的情况下, sliding-time-windows 可能是解决方案。在此处检查备选方案:https://kafka.apache.org/25/documentation/streams/developer-guide/dsl-api.html#windowing
您似乎不想进行 窗口化 聚合,而是整体聚合但每小时发出当前结果。
因此,您根本不应使用 windowBy()
,而应使用 "regular" 非窗口聚合。聚合后,您可以使用 suppress()
定期发出结果:https://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/kstream/Suppressed.html#untilTimeLimit-java.time.Duration-org.apache.kafka.streams.kstream.Suppressed.BufferConfig-