如何从 Kafka 处理器查询数据库?

How to query a database from a Kafka processor?

我正在尝试使用 KafkaKafka Streams 创建账户管理服务。

SignupRequest 消息放在 signup-requests 主题上,流中使用该主题的第一个处理器必须首先检查电子邮件的唯一性,这就是问题开始的地方,我正在考虑 2可能性,但我只是一个新手......

第一个是在 accounts 主题上创建一个 KTable,这样我就可以用它检查电子邮件的唯一性。但是我读到主题中的消息有时间离开,之后它们被删除。因此,如果使用已检查电子邮件的帐户创建时间早于配置的离开时间,那么它不应该出现在 KTable 中,我的验证将受到影响。

第二个选项是查询真正直接保存帐户的数据库,但是如何在 kafka 处理器中进行异步操作?这是一个好习惯吗?

I read that messages in a topic have a time to leave, after which they are deleted.

您可以根据时间(或主题的大小)定义保留,但您也可以将主题配置为 compacted。这是一个特殊的保留选项,这意味着对于每个 key,最新的消息总是被保留——不管它是什么时候收到的。因此,压缩主题非常适合位于 KTables 后面的主题。