在 kstream 中实现 Monthwise Window 存储
Implementing Monthwise Window store in kstream
我需要为 kstream 实施每月 windows。我尝试实施相同的方法 由于一个月中的天数不同,我无法创建 window 不同大小的商店。如果有人有实施每月 window 的想法,请帮助我。
这是我实现的一段代码
KTable<Windowed<String>, Customer> tableStream = input.groupByKey().windowedBy(TimeWindows.of(Duration.ofDays(calendar.getActualMaximum(Calendar.DAY_OF_MONTH)))).aggregate(() -> new Customer()
.withMsisdn(null)
.withCustName(null)
.withRechargeAmount(0)
.withCreatedTime(null),
//Aggregator
(k, v, aggV) -> new Customer()
.withMsisdn(v.getMsisdn())
.withCustName(v.getCustName())
.withRechargeAmount(aggV.getRechargeAmount() + v.getRechargeAmount())
.withCreatedTime(v.getCreatedTime()),
//Serializer
Materialized.<String, Customer, WindowStore<Bytes, byte[]>>as("cust-store2").withValueSerde(AppSerdes.Customer()));
tableStream.toStream().foreach(
(wKey, value) -> log.info("MONTH-WISE "+
"Store ID: " + wKey.key() + " Window ID: " + wKey.window().hashCode() +
" Window start: " + Instant.ofEpochMilli(wKey.window().start()).atOffset(ZoneOffset.UTC) +
" Window end: " + Instant.ofEpochMilli(wKey.window().end()).atOffset(ZoneOffset.UTC) +
" Count: " + value
)
);
谢谢
不幸的是,基于日历的 windows 不包含在 Kafka Streams 中。有一个 open ticket 请求此功能。
在 Kafka Streams 中没有基于日历的 windows 的根本原因是 window 长度是如何在 window 存储的键中编码的。
但是,我可以想象您可以在具有连接键值存储的自定义 KStream#transform()
中实施每月 windows。
我需要为 kstream 实施每月 windows。我尝试实施相同的方法 由于一个月中的天数不同,我无法创建 window 不同大小的商店。如果有人有实施每月 window 的想法,请帮助我。
这是我实现的一段代码
KTable<Windowed<String>, Customer> tableStream = input.groupByKey().windowedBy(TimeWindows.of(Duration.ofDays(calendar.getActualMaximum(Calendar.DAY_OF_MONTH)))).aggregate(() -> new Customer()
.withMsisdn(null)
.withCustName(null)
.withRechargeAmount(0)
.withCreatedTime(null),
//Aggregator
(k, v, aggV) -> new Customer()
.withMsisdn(v.getMsisdn())
.withCustName(v.getCustName())
.withRechargeAmount(aggV.getRechargeAmount() + v.getRechargeAmount())
.withCreatedTime(v.getCreatedTime()),
//Serializer
Materialized.<String, Customer, WindowStore<Bytes, byte[]>>as("cust-store2").withValueSerde(AppSerdes.Customer()));
tableStream.toStream().foreach(
(wKey, value) -> log.info("MONTH-WISE "+
"Store ID: " + wKey.key() + " Window ID: " + wKey.window().hashCode() +
" Window start: " + Instant.ofEpochMilli(wKey.window().start()).atOffset(ZoneOffset.UTC) +
" Window end: " + Instant.ofEpochMilli(wKey.window().end()).atOffset(ZoneOffset.UTC) +
" Count: " + value
)
);
谢谢
不幸的是,基于日历的 windows 不包含在 Kafka Streams 中。有一个 open ticket 请求此功能。 在 Kafka Streams 中没有基于日历的 windows 的根本原因是 window 长度是如何在 window 存储的键中编码的。
但是,我可以想象您可以在具有连接键值存储的自定义 KStream#transform()
中实施每月 windows。