如何从 KTable 中删除旧键

How to delete old keys from a KTable

我有一个生产者,它使用 A 或 B 键将记录发布到 Kafka 主题。

在流应用程序中,我将每个带有键 A 的记录平映射到带有键 U、V 或 W 的记录,并将每个带有键 B 的记录平映射到带有键 X、Y 或 Z 的记录。

每个平面地图操作创建的记录数各不相同。例如,具有键 A 的特定记录可能映射到具有键 U 的一个记录,但另一个可能映射到键 V 和键 W 之一。

我想为这些平面映射记录创建一个 KTable。但是,我希望这个KTable的key能够匹配key A和B的每条记录上次flat-map操作生成的key。

例如,如果在某个特定时刻,KTable 具有键 X、Y 和 U,然后发布键 B 的记录并映射到键 X 的记录,我希望 KTable只有 X 和 U 键。

如果有人对我如何做到这一点有任何建议,我将不胜感激。

您的 flatMap() 需要是有状态的,即您可以使用 flatTransfrom() 和状态存储来实现它。我们可以通过 KStream#toTable() 运算符(在 2.5 版本中添加)将 flatTransform() 的结果更新为结果 KTable

键值存储维护来自输入主题的原始数据。每次密钥更改时,您现在可以访问密钥的旧记录(来自商店)和新记录(输入 tranform()),并可以发出相应的记录来更新下游 KTable