Kafka中如何根据另一条记录的处理结果来处理一条记录?
How to process a record in Kafka based on the processing result of another record?
我有一个 @KafkaListener
class 监听特定主题并使用包含 Person
对象或 Phone
对象(并且只有一个他们)。每个 Phone
都有对应 Person
的参考/关联 ID。侦听器 class 执行某些特定于接收到的类型的验证,将对象保存到数据库中并生成传输成功/失败的响应返回给另一个服务使用的 Kafka。
所以一个Person
可以在没有任何对应的Phone
的情况下成功传输,但是一个Phone
传输应该只有在相应的Person
传输成功的情况下才能成功。我不知道如何实现这个 "synchronization",因为 Person
s 和 Phone
s 作为单独的记录独立进入 Kafka,并且不能保证 Person
对应于特定 Phone
将在 Phone
.
之前处理
在给定当前架构的情况下是否有可能实现这样的同步,或者我是否应该重新设计生产者并将 Person
/ Phone
对作为单独的类型发送?
谢谢。
不清楚您如何对不同的对象类型使用相同的序列化程序,但您可能应该创建单独的主题 and/or 将当前的主题分为两个(请参阅 Kafka Streams API)
我假设人数少于 phones,在这种情况下,您可以从人物主题构建 KTable,然后当您获得 phone 条记录时,您可以执行左连接或针对此 table 查找某人 ID
其他解决方案可能涉及使用 Kafka Connect 将记录转储到您可以进行连接的系统中
我有一个 @KafkaListener
class 监听特定主题并使用包含 Person
对象或 Phone
对象(并且只有一个他们)。每个 Phone
都有对应 Person
的参考/关联 ID。侦听器 class 执行某些特定于接收到的类型的验证,将对象保存到数据库中并生成传输成功/失败的响应返回给另一个服务使用的 Kafka。
所以一个Person
可以在没有任何对应的Phone
的情况下成功传输,但是一个Phone
传输应该只有在相应的Person
传输成功的情况下才能成功。我不知道如何实现这个 "synchronization",因为 Person
s 和 Phone
s 作为单独的记录独立进入 Kafka,并且不能保证 Person
对应于特定 Phone
将在 Phone
.
在给定当前架构的情况下是否有可能实现这样的同步,或者我是否应该重新设计生产者并将 Person
/ Phone
对作为单独的类型发送?
谢谢。
不清楚您如何对不同的对象类型使用相同的序列化程序,但您可能应该创建单独的主题 and/or 将当前的主题分为两个(请参阅 Kafka Streams API)
我假设人数少于 phones,在这种情况下,您可以从人物主题构建 KTable,然后当您获得 phone 条记录时,您可以执行左连接或针对此 table 查找某人 ID
其他解决方案可能涉及使用 Kafka Connect 将记录转储到您可以进行连接的系统中