在 kstream 中实现 Monthwise Window 存储

Implementing Monthwise Window store in kstream

我需要为 kstream 实施每月 windows。我尝试实施相同的方法 由于一个月中的天数不同,我无法创建 window 不同大小的商店。如果有人有实施每月 window 的想法,请帮助我。

这是我实现的一段代码

 KTable<Windowed<String>, Customer> tableStream =  input.groupByKey().windowedBy(TimeWindows.of(Duration.ofDays(calendar.getActualMaximum(Calendar.DAY_OF_MONTH)))).aggregate(() -> new Customer()
                   .withMsisdn(null)
                   .withCustName(null)
                   .withRechargeAmount(0)
                   .withCreatedTime(null),
           //Aggregator
           (k, v, aggV) -> new Customer()
                   .withMsisdn(v.getMsisdn())
                   .withCustName(v.getCustName())
                   .withRechargeAmount(aggV.getRechargeAmount() + v.getRechargeAmount())
                   .withCreatedTime(v.getCreatedTime()),
           //Serializer
           Materialized.<String, Customer, WindowStore<Bytes, byte[]>>as("cust-store2").withValueSerde(AppSerdes.Customer()));
   tableStream.toStream().foreach(
           (wKey, value) -> log.info("MONTH-WISE "+
                   "Store ID: " + wKey.key() + " Window ID: " + wKey.window().hashCode() +
                   " Window start: " + Instant.ofEpochMilli(wKey.window().start()).atOffset(ZoneOffset.UTC) +
                   " Window end: " + Instant.ofEpochMilli(wKey.window().end()).atOffset(ZoneOffset.UTC) +
                   " Count: " + value
           )
   );

谢谢

不幸的是,基于日历的 windows 不包含在 Kafka Streams 中。有一个 open ticket 请求此功能。 在 Kafka Streams 中没有基于日历的 windows 的根本原因是 window 长度是如何在 window 存储的键中编码的。

但是,我可以想象您可以在具有连接键值存储的自定义 KStream#transform() 中实施每月 windows。