Kafka中如何根据另一条记录的处理结果来处理一条记录?

How to process a record in Kafka based on the processing result of another record?

我有一个 @KafkaListener class 监听特定主题并使用包含 Person 对象或 Phone 对象(并且只有一个他们)。每个 Phone 都有对应 Person 的参考/关联 ID。侦听器 class 执行某些特定于接收到的类型的验证,将对象保存到数据库中并生成传输成功/失败的响应返回给另一个服务使用的 Kafka。

所以一个Person可以在没有任何对应的Phone的情况下成功传输,但是一个Phone传输应该只有在相应的Person传输成功的情况下才能成功。我不知道如何实现这个 "synchronization",因为 Persons 和 Phones 作为单独的记录独立进入 Kafka,并且不能保证 Person对应于特定 Phone 将在 Phone.

之前处理

在给定当前架构的情况下是否有可能实现这样的同步,或者我是否应该重新设计生产者并将 Person / Phone 对作为单独的类型发送?

谢谢。

不清楚您如何对不同的对象类型使用相同的序列化程序,但您可能应该创建单独的主题 and/or 将当前的主题分为两个(请参阅 Kafka Streams API)

我假设人数少于 phones,在这种情况下,您可以从人物主题构建 KTable,然后当您获得 phone 条记录时,您可以执行左连接或针对此 table 查找某人 ID

其他解决方案可能涉及使用 Kafka Connect 将记录转储到您可以进行连接的系统中