Kafka Streams - 跳频 windows - 删除重复键

Kafka Streams - Hopping windows - deduplicate keys

我在 4 小时 window 上每 5 分钟进行一次跳跃 window 聚合。由于跳跃 windows 重叠,我得到了具有不同聚合值的重复键。

TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)

如何消除具有重复数据的重复键或仅选择包含最新值的键。

2021 年 5 月更新: Kafka Streams API 从 2019 年 3 月开始支持 "final" window results nowadays, via a suppress() operator. See the previous docs link as well as the blog Kafka Streams’ Take on Watermarks and Triggers 了解详情。

After defining your windowed computation, you can suppress the intermediate results, emitting the final count for each user when the window is closed.

KGroupedStream<UserId, Event> grouped = ...;

grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
       .count()
       .suppress(Suppressed.untilWindowCloses(unbounded()))
       .filter((windowedUserId, count) -> count < 3)
       .toStream()
       .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

原始答案(当不使用上面的 suppress() 运算符时仍然适用):

如果我的理解正确,那么这是预期的行为。您没有看到“重复”键,但看到同一键的持续更新。

思考:

# Extreme case: record caches disabled (size set to 0)
alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...

# With record cache enabled, you would see sth like this.
alice->23, alice->59, alice->100, ...

查看 http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management 中的解释,其中对此进行了更详细的描述。如果你想看到每个记录键的“重复”更少,你可以通过应用程序配置中的 cache.max.bytes.bufferingStreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 增加记录缓存的大小(使用 DSL 时)。 commit.interval.ms.

也有相互作用

如果您想知道“为什么 Kafka Streams API 首先会以这种方式运行”,我推荐您之前发布的博客 post https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/本周。

除了 Michael 写的内容之外,在跳跃 windows 中还有另一层 'duplication'。由于 windows 重叠,后续 windows 发出的值可能相同。 例如,假设您有一个五分钟的 window 和一分钟的跳跃:{0..5},{1..6},{2..7} 等等。来自输入主题的给定记录可能属于不同的时间 windows。

这与翻滚 windows 相反,其中 windows 是不重叠的,因此每条记录都是单个 window 的一部分。不幸的是,翻滚 windows 并不适合所有用例;一个示例可以是聚合,其中具有相同键的两个记录落在两个后续 windows.

的边缘

使用跳跃 windows 时,有几种方法可以 'deduplicate'。一种方法是 'dedup' 下游。另一种方法是在 Kafka Streams 中执行此操作,但这仅与特定拓扑相关。如前所述,这些结果不是真正的重复,而是连续 windows 的结果。如果你只想要某个键最后 window 的结果,你可以这样写:

windowedKtable
.toStream((windowedKey, value) -> windowedKey.key())
.groupByKey()
.reduce((value1, value2) -> value1.lastestActivity() > value2.lastestActivity() ? value1 : value2)

我不会说这是最佳做法,只是一种在非常具体的情况下解决问题的方法。

更多关于在 Kafka Streams 中 windowing 的信息: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing

使用跳频window时,一个key同时存在多个时间windows。当新的日志产生时,聚合会同时改变这些时间 windows 的状态,因此在应用 toStream() 时,会向下游主题产生重复数据删除日志。

要获取最新时间 window 的结果,您必须提供一个 filter() 来过滤最新时间 window 更改日志,这里有一个示例描述如何获取最新 window 使用跳跃时的聚合结果 window.

hopping window aggregation with latest window result