使用结构化流处理来自 kafka 的 json 数据
Processing json data from kafka using structured streaming
我想将来自 Kafka 的传入 JSON 数据转换为数据帧。
我正在使用 Scala 2.12
的结构化流媒体
大多数人添加硬编码模式,但如果 json 可以有额外的字段,则每次都需要更改代码库,这很乏味。
一种方法是将其写入文件并使用 进行推断,但我宁愿避免这样做。
还有其他方法可以解决这个问题吗?
编辑:找到了一种将 json 字符串转换为数据帧的方法,但无法从流源中提取它,是否可以提取它?
以字符串形式读取数据,然后将其转换为 map[string,String],这样您就可以在不知道其架构
的情况下处理任何 json
一种方法是将模式本身存储在消息 headers 中(而不是在键或值中)。
尽管这会增加消息大小,但无需任何外部资源(如文件或模式注册表)即可轻松解析 JSON 值。
新消息可以有新的模式,同时旧消息仍然可以使用它们的旧模式本身来处理,因为模式在消息本身中。
或者,您可以 版本 模式并为消息中的每个模式包含一个 id
headers(或)一个键或值中的魔术字节并从那里推断架构。
Confluent Schema registry遵循这种方法。它允许您基本上浏览同一架构的不同版本,并查看您的架构如何随时间演变。
基于 JavaTechnical 答案,最好的方法是使用模式注册表和
avro 数据而不是 json,没有绕过硬编码模式(现在)。
将您的架构名称和 ID 作为 header 并使用它们从架构注册表中读取架构。
使用 from_avro
函数将该数据转换为 df!
我想将来自 Kafka 的传入 JSON 数据转换为数据帧。
我正在使用 Scala 2.12
大多数人添加硬编码模式,但如果 json 可以有额外的字段,则每次都需要更改代码库,这很乏味。
一种方法是将其写入文件并使用 进行推断,但我宁愿避免这样做。
还有其他方法可以解决这个问题吗?
编辑:找到了一种将 json 字符串转换为数据帧的方法,但无法从流源中提取它,是否可以提取它?
以字符串形式读取数据,然后将其转换为 map[string,String],这样您就可以在不知道其架构
的情况下处理任何 json一种方法是将模式本身存储在消息 headers 中(而不是在键或值中)。
尽管这会增加消息大小,但无需任何外部资源(如文件或模式注册表)即可轻松解析 JSON 值。
新消息可以有新的模式,同时旧消息仍然可以使用它们的旧模式本身来处理,因为模式在消息本身中。
或者,您可以 版本 模式并为消息中的每个模式包含一个
id
headers(或)一个键或值中的魔术字节并从那里推断架构。Confluent Schema registry遵循这种方法。它允许您基本上浏览同一架构的不同版本,并查看您的架构如何随时间演变。
基于 JavaTechnical 答案,最好的方法是使用模式注册表和 avro 数据而不是 json,没有绕过硬编码模式(现在)。
将您的架构名称和 ID 作为 header 并使用它们从架构注册表中读取架构。
使用 from_avro
函数将该数据转换为 df!