KStreams:使用处理器 API 实现会话 window

KStreams: implementing session window with pocessor API

我需要使用处理器 API 实现类似于会话 windows 的逻辑,以便完全控制状态存储。由于处理器 API 不提供 windowing 抽象,这需要手动完成。但是,我找不到 KStreams 会话 window 逻辑的源代码,无法获得一些初步想法(特别是关于会话超时)。

我期待使用 punctuate 方法,但它是每个处理器计时器而不是每个键计时器。此外 SessionStore<K, AGG> 不提供 API 来遍历数据库中的所有键。

[更新]

例如,假设处理器实例正在处理 K1 并且流时间增加,这导致 K2 的会话超时。 K2 可能存在也可能根本不存在。你怎么知道存在一个特定的键(比如 K2 当流时间增加时(同时处理不同的键)?换句话说,当流时间增加时,你如何找出哪个 windows 已过期(因为你不知道那些钥匙的存在)?

这是 DSL 代码:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java -- 希望对您有所帮助。

虽然不清楚你的问题是什么 -- 它主要是陈述。所以让我试着给出一些一般性的答案。

在 DSL 中,会话根据 "stream time" 进度关闭。仅依赖于输入数据使操作具有确定性。使用挂钟时间会引入不确定性。因此,在 DSL 实现中不需要使用 Punctuation

Additionally SessionStore<K, AGG> doesn't provide an API to traverse the database for all keys.

DSL 中的会话基于键,因此在一段时间内按键扫描商店就足够了(通过 findSessions(...) 完成)。

更新:

在DSL中,每次更新会话window,相应的更新事件会立即发送到下游。因此,DSL 实现不会等待 "stream time" 进一步推进,而是立即发布 current(可能是中间)结果。

为了遵守宽限期,将记录时间戳与 "stream time" 进行比较,如果相应的会话 window 已经关闭,则跳过记录(参见 https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L146)。即,关闭 window 只是一个逻辑步骤(不是实际操作);会话仍将被存储,如果 window 关闭,则不需要向下游发送其他事件,因为最终结果已在上次更新 window 时向下游发送。

保留时间本身不能由 Processor 实现处理,因为它是 SessionStore 的内置功能:在内部,会话存储维护所谓的 "segments"存储会话一段时间。每次完成 put() 时,存储会检查是否可以删除旧段(基于 put() 提供的时间戳)。即,旧会话被延迟删除并作为批量删除(即,整个片段的所有会话将被立即删除)因为它比单独删除更有效。