Kafka:在x时间内没有更新时更新密钥

Kafka: update a key when there is no update in x amount of time

在使用 Kafka 时,有没有办法在 x 时间后更新密钥?

类似

records
    .groupByKey
    .windowedBy(
         TimeWindows
         .of(Duration.ofMinutes(5))
         .grace(Duration.ofMinutes(1))
         .advanceBy(Duration.ofMinutes(1))
    ).count()
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
    ).updateNotSeen(Duration.ofMinutes(30), (k) => (k, 0))

所以在这里,只要 30 分钟后没有看到记录,Kafka 就会发出一条新记录。 (由假设的 updateNotSeen 完成。)

在我的搜索中,我发现了 this 个未解决的问题,如果它存在的话,它允许我以某种方式做到这一点,但我不知道我现在会怎么做。

据我所知,这在 DSL(Java、Scala)中是不可能的。

在开箱即用地提供此类功能之前,您可以通过使用 Processor API of Kafka Streams, however. (The Processor API can similarly be used to implement custom join operations, for example.). In that case you'd not work with tables--which are a DSL-only abstraction--but with state stores (tables are backed by state stores, fwiw), which support direct read-write access from attached Processors or Transformers. Processors and transformers support punctuation 安排周期性操作来自行实现此类自定义功能,类似于 cron。在这样的计划操作期间,您可以检查是否有任何记录(由其记录键标识)在过去 30 分钟内没有看到更新,然后采取相应行动。

此外,了解您可以 combine the Processor API and the DSL(您目前为止一直在使用)也很有帮助。也就是说,您可以在大部分代码中继续使用 DSL,并且只在需要的时候和需要的地方 'plug in' 前面提到的 Processors/Transformers(来自处理器 API)。

希望对您有所帮助!