将值发送到输出主题后清除 KTable 条目

Purge KTable entries after sending values to output topic

我有一个数据库,用于存储每个网页的综合浏览量。它通过使用名为 pageviews 的 Kafka 主题来做到这一点,其中每条消息的页面名称为 keyvalue 为自上一条消息以来的视图数 .

这是 pageviews 主题中预期的消息示例:

浏览量主题:

key: "index", value: 349
key: "products", value: 67
key: "index", value: 15
key: "about", value: 11
...

pageviews 的消费者每次将上述 values 添加到 PAGEVIEWS table。

现在,我正在构建 pageviews 主题的制作人。此应用程序的数据源是 viewstream 主题,其中每个视图创建一条消息,例如:

viewstream 主题:

key: "index", value: <timestamp>
key: "index", value: <timestamp>
key: "product", value: <timestamp>
...

在 Kafka Stream 应用程序上,我有以下拓扑:

PageViewsStreamer:

builder.stream("viewstream")
    .groupByKey()
    .aggregate(...) // this builds a KTable with the sums of views per page
    .toStream()
    .to("pageviews")

此拓扑有 2 个问题:

  1. 保存聚合的 KTable 在向 pageviews 生成输出消息后不会获得 reset/purge,因此只需将聚合值添加到 DB table 我们得到错误的结果。我怎样才能使发送到 pageviews 的每条消息不包含之前消息中已发送的视图?

  2. 我希望每 15 分钟发送一次 pageviews 消息(默认速率大约是每 30 秒一次)。

我正在尝试为两者都使用窗口,但到目前为止我都失败了。

您可以使用 15 分钟的翻滚 windows 来实现此行为,并在 windows 时间过去之前抑制结果(请记住添加一个宽限时间来限制事件的延迟前一个 window 将接受)。查看详细信息 here。 我会做这样的事情:

builder.stream("viewstream")
                .groupByKey()
                //window by a 15-minute time windows, accept event late in 30 second, you can set grace time smaller
                .windowedBy(TimeWindows.of(Duration.ofMinutes(15)).grace(Duration.ofSeconds(30)))
                .aggregate(...) // this builds a KTable with the sums of views per page
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                //re-select key : from window to key
                .selectKey((key, value) -> key.key())
                .to("pageviews");