KAFKA 从数据库初始加载
KAFKA initial load from database
刚开始玩kafka,从数据库导入数据的时候脑子里有很多问题
我的情况是:我有一个包含一些 table 的关系数据库,我想将它们导入到 KAFKA 主题中。我脑子里的困惑是:如果我使用 KAFKA connect 导入所有这些 table,我的消费者如何知道数据何时加载了 all?在我的消费者开始工作之前,我需要将来自不同 table 的大部分记录加载到 kafka 主题中。
例如,我的数据库中有一个客户和一个订单 table。我想将这 2 个 table 导入客户和订单主题。如果我的消费者因为有一个新订单而启动,但有关客户的信息尚未出现在客户主题中,则它将不起作用。我是不是太复杂了?
技术上:您的生产者和消费者需要同步机制。 Kafka 不提供让消费者知道 "topic is full" 的能力(因为可能总是有人写这个主题)。
生产者需要以某种方式(例如通过另一个 Kafka 主题?)通知消费者 "they have finished their phase"。只有在消费者收到该信息后,他们才能开始处理。
旁注:你提到
If my consumer starts because there is a new Order but the information about the customer is not yet present in the Customer topic, it will not work
你现在是怎么解决这个问题的?
Kafka 提供随时间变化的实时事件流。回覆。你的问题 "how will my consumers know when the data has all been loaded" - 你问的是完成一些有限的 activity (将数据加载到 Kafka?),但事件会随着时间的推移发生并继续发生。您的 Streams 应用程序不断 运行s 并继续 运行 - 没有 'end'.
您当前对 table 中的数据与 Kafka 主题中的事件的思考听起来像是您将它们视为两个等价的概念,但它们不是。
if I use KAFKA connect to import all these tables, how will my consumers know when the data has all been loaded?
正如其他答案所说,它们不会开箱即用。
您需要手动(或以编程方式)监控导入过程或确定 "completeness" 的初始条件,或者至少您有信心启动消费者。
从那里,我建议要么设置 CDC,要么让客户和订单服务直接写入 Kafka 主题。如果直接写入 Kafka,则需要考虑幂等事件,例如新客户帐户的排序、编辑和删除。那么如果客户被删除,你是否仍然保留与该客户相关的所有订单,例如?
一旦两个数据源都在主题中,欢迎您使用 KStreams / KSQL 加入客户订单
it will not work
您可能想弄清楚 "it" 是什么,但是来自至少一个主题的消费者可以正常工作。上面提到的连接将在导入数据时最终保持一致,但这是你在异步处理中没有事务性、原子语义的权衡
刚开始玩kafka,从数据库导入数据的时候脑子里有很多问题
我的情况是:我有一个包含一些 table 的关系数据库,我想将它们导入到 KAFKA 主题中。我脑子里的困惑是:如果我使用 KAFKA connect 导入所有这些 table,我的消费者如何知道数据何时加载了 all?在我的消费者开始工作之前,我需要将来自不同 table 的大部分记录加载到 kafka 主题中。
例如,我的数据库中有一个客户和一个订单 table。我想将这 2 个 table 导入客户和订单主题。如果我的消费者因为有一个新订单而启动,但有关客户的信息尚未出现在客户主题中,则它将不起作用。我是不是太复杂了?
技术上:您的生产者和消费者需要同步机制。 Kafka 不提供让消费者知道 "topic is full" 的能力(因为可能总是有人写这个主题)。
生产者需要以某种方式(例如通过另一个 Kafka 主题?)通知消费者 "they have finished their phase"。只有在消费者收到该信息后,他们才能开始处理。
旁注:你提到
If my consumer starts because there is a new Order but the information about the customer is not yet present in the Customer topic, it will not work
你现在是怎么解决这个问题的?
Kafka 提供随时间变化的实时事件流。回覆。你的问题 "how will my consumers know when the data has all been loaded" - 你问的是完成一些有限的 activity (将数据加载到 Kafka?),但事件会随着时间的推移发生并继续发生。您的 Streams 应用程序不断 运行s 并继续 运行 - 没有 'end'.
您当前对 table 中的数据与 Kafka 主题中的事件的思考听起来像是您将它们视为两个等价的概念,但它们不是。
if I use KAFKA connect to import all these tables, how will my consumers know when the data has all been loaded?
正如其他答案所说,它们不会开箱即用。
您需要手动(或以编程方式)监控导入过程或确定 "completeness" 的初始条件,或者至少您有信心启动消费者。
从那里,我建议要么设置 CDC,要么让客户和订单服务直接写入 Kafka 主题。如果直接写入 Kafka,则需要考虑幂等事件,例如新客户帐户的排序、编辑和删除。那么如果客户被删除,你是否仍然保留与该客户相关的所有订单,例如?
一旦两个数据源都在主题中,欢迎您使用 KStreams / KSQL 加入客户订单
it will not work
您可能想弄清楚 "it" 是什么,但是来自至少一个主题的消费者可以正常工作。上面提到的连接将在导入数据时最终保持一致,但这是你在异步处理中没有事务性、原子语义的权衡