在 Kafka 中处理具有依赖关系的数据时的最佳实践?
Best practice at the moment of processing data with dependencies in Kafka?
我们正在开发一个从不同来源获取数据的应用程序,一旦数据可用,我们就会对其进行处理,将其放在一起,然后继续将其移至不同的主题。
在我们的例子中,我们有 3 个主题,这些主题中的每一个都将带来与来自不同主题的数据有关系的数据,在这种情况下,生成的每个实体都可以同时接收或不接收(或很短的时间),这就是问题所在,因为在我们继续进入主题之前,需要将这 3 个实体合并为一个。
我们的想法是创建一个单独的主题,它将包含所有尚未处理的数据,然后有一个单独的线程将以固定的时间间隔检查该主题,并检查该主题的依赖关系可用,如果它们可用,那么我们从这个单独的主题中删除这个实体,如果不可用,我们将这个实体保留在那里直到它得到解决。
在所有这些解释的最后,我的问题是这样做是否合理,或者 Kafka 提供了其他好的实践或策略来解决这种情况?
根据保留策略,Kafka 消息可能会在一段时间后变得干净,因此您需要将消息存储在某个地方:
我可以看到下面的选项,但总是每个问题都有可能的方法和解决方案:
- 已处理所有消息并将“未处理消息”转发到其他主题,例如 A
- Kafka 处理器API 消费来自主题 A 的消息并存储到状态存储中
- 安排一个带有时间间隔的 punctuate() 方法
- 迭代存储在状态中的所有消息。
- 检查相关性(如果可用)从状态存储中删除消息并对其进行处理,或者发布回原始主题以再次处理。
可参考下方link参考
https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html
我们正在开发一个从不同来源获取数据的应用程序,一旦数据可用,我们就会对其进行处理,将其放在一起,然后继续将其移至不同的主题。
在我们的例子中,我们有 3 个主题,这些主题中的每一个都将带来与来自不同主题的数据有关系的数据,在这种情况下,生成的每个实体都可以同时接收或不接收(或很短的时间),这就是问题所在,因为在我们继续进入主题之前,需要将这 3 个实体合并为一个。
我们的想法是创建一个单独的主题,它将包含所有尚未处理的数据,然后有一个单独的线程将以固定的时间间隔检查该主题,并检查该主题的依赖关系可用,如果它们可用,那么我们从这个单独的主题中删除这个实体,如果不可用,我们将这个实体保留在那里直到它得到解决。
在所有这些解释的最后,我的问题是这样做是否合理,或者 Kafka 提供了其他好的实践或策略来解决这种情况?
根据保留策略,Kafka 消息可能会在一段时间后变得干净,因此您需要将消息存储在某个地方:
我可以看到下面的选项,但总是每个问题都有可能的方法和解决方案:
- 已处理所有消息并将“未处理消息”转发到其他主题,例如 A
- Kafka 处理器API 消费来自主题 A 的消息并存储到状态存储中
- 安排一个带有时间间隔的 punctuate() 方法
- 迭代存储在状态中的所有消息。
- 检查相关性(如果可用)从状态存储中删除消息并对其进行处理,或者发布回原始主题以再次处理。
可参考下方link参考 https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html