事务性发布到 Kafka
Transactional Publish to Kafka
总之,
我正在思考一个问题有一段时间了,虽然我确实有一个概念性的解决方案,但我不确定,没有比这更简单的解决方案了,尤其是如果已经有一个库封装了这样的解决方案。
基本上,如果数据库事务提交,我想向 Kafka 发送消息。此消息必须可靠地存储,以便没有中断(网络或电源)可以防止此消息丢失。不能使用 XA。
显然,这很难。如果在发送消息之前提交到数据库的事务,则可能会出现许多问题:应用程序服务器可能会断电,网络连接可能会在最不方便的时刻中断等等。如果在提交数据库之前向 Kafka 发送消息也是如此。
我的解决方案草案如下所示:
- 在数据库中创建了一个额外的
event
table
- 不是向Kafka发送消息,而是将事件存储在此table
- 第二个进程读取此 table 并将消息发布到 Kafka
第二个过程以日记形式工作:
- 阅读一条未发送的消息,将此消息标记为
in progress
,提交到数据库
- Post 消息到 Kafka
- 如果不成功:重做,否则:
- 读取来自 Kafka 的消息
- 将消息标记为
sent
如果发生系统中断,这里的技巧是重新读取来自 Kafka 的消息以进行恢复。所有无法从 Kafka 读取的消息必须被视为丢失然后重新发送。
不,我想,我不是唯一不能丢失消息的人,所以我无法想象这是必须手动实现的东西。我说得对吗?
另一种解决方案是 运行 CDC kafka 连接以将数据从数据库 table 流式传输到主题。 CDC 连接器仅提供一次交付,您不会遇到重复的问题
这也涉及两阶段提交引起的问题。
数据库充当真相的来源。 CDC 跟踪交易日志。即使连接器停止,重新启动时它也会从失败的地方开始并最终与数据库状态同步
经过更多的咨询和思考,这是某种 XY 问题。我最初问的问题是“我如何使用 Kafka 实现同步语义”——你不应该这样。
考虑到应用程序的业务部分,我们还没有找到实际需要恰好一次语义的用例。在某些情况下,丢失消息是可以接受的,在其他情况下,我们可以忍受幻影消息。
如果您确实有 exactly-once 的要求,您肯定会发现,您还有进一步的要求,要求同步调用,就像您需要实际确认一样,操作已执行。帮自己一个忙,尽量不要使用消息系统来实现它。
总之,
我正在思考一个问题有一段时间了,虽然我确实有一个概念性的解决方案,但我不确定,没有比这更简单的解决方案了,尤其是如果已经有一个库封装了这样的解决方案。
基本上,如果数据库事务提交,我想向 Kafka 发送消息。此消息必须可靠地存储,以便没有中断(网络或电源)可以防止此消息丢失。不能使用 XA。
显然,这很难。如果在发送消息之前提交到数据库的事务,则可能会出现许多问题:应用程序服务器可能会断电,网络连接可能会在最不方便的时刻中断等等。如果在提交数据库之前向 Kafka 发送消息也是如此。
我的解决方案草案如下所示:
- 在数据库中创建了一个额外的
event
table - 不是向Kafka发送消息,而是将事件存储在此table
- 第二个进程读取此 table 并将消息发布到 Kafka
第二个过程以日记形式工作:
- 阅读一条未发送的消息,将此消息标记为
in progress
,提交到数据库 - Post 消息到 Kafka
- 如果不成功:重做,否则:
- 读取来自 Kafka 的消息
- 将消息标记为
sent
如果发生系统中断,这里的技巧是重新读取来自 Kafka 的消息以进行恢复。所有无法从 Kafka 读取的消息必须被视为丢失然后重新发送。
不,我想,我不是唯一不能丢失消息的人,所以我无法想象这是必须手动实现的东西。我说得对吗?
另一种解决方案是 运行 CDC kafka 连接以将数据从数据库 table 流式传输到主题。 CDC 连接器仅提供一次交付,您不会遇到重复的问题 这也涉及两阶段提交引起的问题。 数据库充当真相的来源。 CDC 跟踪交易日志。即使连接器停止,重新启动时它也会从失败的地方开始并最终与数据库状态同步
经过更多的咨询和思考,这是某种 XY 问题。我最初问的问题是“我如何使用 Kafka 实现同步语义”——你不应该这样。
考虑到应用程序的业务部分,我们还没有找到实际需要恰好一次语义的用例。在某些情况下,丢失消息是可以接受的,在其他情况下,我们可以忍受幻影消息。
如果您确实有 exactly-once 的要求,您肯定会发现,您还有进一步的要求,要求同步调用,就像您需要实际确认一样,操作已执行。帮自己一个忙,尽量不要使用消息系统来实现它。