从 Kafka 状态存储中删除条目

Deleting an entry from Kafka state store

我们有一个流处理应用程序,在流中,我们提取一个 id 并将其存储在状态存储中。我的用例非常简单,我们存储 id,因为当我们收到请求时,我们将检查存储中是否存在该 id。我们遇到的问题是,所有的事件都会有一个唯一的 id,状态存储最终会导致内存不足。一段时间后不需要状态存储中的数据。

  1. 有没有办法进行一些配置,状态根据配置在一段时间后清除数据?

  2. 另一种方法是发送带有密钥的逻辑删除消息,以便从存储中删除该条目。但是,我觉得,不知何故还有更多工作要做,因为我们必须设法向主题发送另一条消息并处理它以删除条目

  3. 如果我们采用第二种方法,如何进行故障处理?
  4. 有没有办法从商店中手动删除条目。如果是这样,从状态存储中删除条目会产生什么影响?
  1. 不是自动取款机。 (比照https://issues.apache.org/jira/browse/KAFKA-4212)
  2. 这是正确的方法。
  3. 没有区别。如果您执行 KeyValueStore#delete() 条目将从存储中删除,并且逻辑删除将写入支持的变更日志主题。
  4. 队列是什么意思?