Kafka Stream 处理过程中的外部系统查询
External system queries during Kafka Stream processing
我正在尝试为流式分析设计一个流式架构。
要求:
- RT 和 NRT 流数据输入
- 实现一些财务分析的流处理器
- RT 和 NRT 分析输出流
- 流处理期间引用数据请求
我正在探索用于流处理和 RT/NRT 实时消息传递的 Kafka 和 Kafka Streams。
我的问题是:我需要在流处理期间对外部系统(信息提供者、MongoDB 等)执行一些查询。根据外部系统特征,这些查询可以是同步和异步请求-响应。
我读过 this post 解释如何在处理过程中加入 KStream 和 KTable,这很有趣,但在这种情况下,KTable 不依赖于来自 KStream 的输入参数,它只是一个流式表示table.
我需要为每个 KStream 消息查询外部系统,将一些消息字段作为查询参数传递并使用查询结果丰富流式消息,然后将丰富的消息发布到输出主题。
是否有任何统一的范例来设计这种流处理?
有没有我最好使用的特定技术?请记住,查询可以是同步的,也可以是异步的。
我还想为这些外部系统设计包装器,实现一种分布式 RPC,可从 Kafka 流处理调用。
你能推荐 technology/framework 吗?
我正在考虑使用 Akka 参与者来分发查询响应器,但我不明白 Akka 是否适合请求-响应范式。
谢谢
关于对外系统的查询模式,你有多种可能:
- 推荐:使用Kafka Connect将您的数据从外部系统导入到Kafka,并将这些主题阅读为
KTable
s来做KStream-KTable
查找加入。
- 您可以在您的 UDF 代码中实现您自己的自定义查找连接。根据详细信息,您可以使用
KStream
方法 #mapValues()
、#map()
或较低级别的方法,如 #transform()
或 #process()
。因此,您手动打开与外部系统的连接,并对您处理的每条记录发出查找查询。
- 同步查找:如果您对外部系统进行同步调用,则无需考虑其他任何事情(例如,您可以使用
#mapValues()
来实现)
- async lookpus:对于外部系统的异步调用,更难正确(你应该非常小心——这不是推荐的模式,因为没有目前有图书馆支持)。 首先,你需要以可靠的方式记住你发出的所有异步调用(即你需要附加一个状态并写下你想要的每个请求在 之前 你实际上启动它)。 其次,在每个回调中,您需要以某种方式缓冲结果,并在稍后再次调用发出请求的同一运算符时处理它(不可能在异步中产生下游结果回调处理程序,但仅限于 UDF 代码内)。下游发出后,您可以从状态中删除请求。 第三,在失败案例后的恢复中,您需要检查未完成请求的状态并再次发出这些请求。还要记住,这种异步处理打破了一些内部 Streams 假设,例如关于记录主题偏移量的保证处理顺序。
比较这个关于偏移量提交的流中故障处理的问题:
我正在尝试为流式分析设计一个流式架构。 要求:
- RT 和 NRT 流数据输入
- 实现一些财务分析的流处理器
- RT 和 NRT 分析输出流
- 流处理期间引用数据请求
我正在探索用于流处理和 RT/NRT 实时消息传递的 Kafka 和 Kafka Streams。 我的问题是:我需要在流处理期间对外部系统(信息提供者、MongoDB 等)执行一些查询。根据外部系统特征,这些查询可以是同步和异步请求-响应。
我读过 this post 解释如何在处理过程中加入 KStream 和 KTable,这很有趣,但在这种情况下,KTable 不依赖于来自 KStream 的输入参数,它只是一个流式表示table.
我需要为每个 KStream 消息查询外部系统,将一些消息字段作为查询参数传递并使用查询结果丰富流式消息,然后将丰富的消息发布到输出主题。 是否有任何统一的范例来设计这种流处理? 有没有我最好使用的特定技术?请记住,查询可以是同步的,也可以是异步的。
我还想为这些外部系统设计包装器,实现一种分布式 RPC,可从 Kafka 流处理调用。 你能推荐 technology/framework 吗? 我正在考虑使用 Akka 参与者来分发查询响应器,但我不明白 Akka 是否适合请求-响应范式。
谢谢
关于对外系统的查询模式,你有多种可能:
- 推荐:使用Kafka Connect将您的数据从外部系统导入到Kafka,并将这些主题阅读为
KTable
s来做KStream-KTable
查找加入。 - 您可以在您的 UDF 代码中实现您自己的自定义查找连接。根据详细信息,您可以使用
KStream
方法#mapValues()
、#map()
或较低级别的方法,如#transform()
或#process()
。因此,您手动打开与外部系统的连接,并对您处理的每条记录发出查找查询。- 同步查找:如果您对外部系统进行同步调用,则无需考虑其他任何事情(例如,您可以使用
#mapValues()
来实现) - async lookpus:对于外部系统的异步调用,更难正确(你应该非常小心——这不是推荐的模式,因为没有目前有图书馆支持)。 首先,你需要以可靠的方式记住你发出的所有异步调用(即你需要附加一个状态并写下你想要的每个请求在 之前 你实际上启动它)。 其次,在每个回调中,您需要以某种方式缓冲结果,并在稍后再次调用发出请求的同一运算符时处理它(不可能在异步中产生下游结果回调处理程序,但仅限于 UDF 代码内)。下游发出后,您可以从状态中删除请求。 第三,在失败案例后的恢复中,您需要检查未完成请求的状态并再次发出这些请求。还要记住,这种异步处理打破了一些内部 Streams 假设,例如关于记录主题偏移量的保证处理顺序。
- 同步查找:如果您对外部系统进行同步调用,则无需考虑其他任何事情(例如,您可以使用
比较这个关于偏移量提交的流中故障处理的问题: