如何在结构化流中将 JSON 消息转换为 DataFrame?
How to convert JSON messages to DataFrame in Structured Streaming?
我有一个从 kafka 读取的 spark 流代码,但我的值是这样的:
"{ id:'1',name: 'John', address: 'Highway 37' }|{ id:'2',name: 'Marta', address: 'Highway 37' }|{ id:'3',name: 'Juan', address: 'Highway 37' }|{ id:'4',name: 'Erick', address: 'Highway 37' }|{ id:'6',name: 'Alex', address: 'Highway 37' }|{ id:'7',name: 'Juanjo', address: 'Highway 37' }|{ id:'8',name: 'Pam', address: 'Highway 37' }|{ id:'9',name: 'Paty', address: 'Highway 37' }|{ id:'10',name: 'Diana', address: 'Highway 37' }"
是json的字符串集合,这是我的kafka值:
val kafkaRawData = df.selectExpr("CAST(value AS string)")
但我想将这个字符串 json 数据集转换为数据框或数据集,我该怎么做???
您应该简单地使用具有以下签名的 from_json standard function and DataStreamWriter.foreachBatch 运算符:
foreachBatch(function: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]
使用 from_json
可以将字符串转换为正确的 JSON,而 foreachBatch
可以让您根据 micro-batch.
访问 Datasets
使用 from_json
函数处理 json 字符串到对象的转换。为了简化这个,或者让它更通用,将示例 json 字符串保存在文件中以从中读取以推断模式。使用此架构并将其传递给函数。有时,对于复杂的 json 对象,使用 StructType
创建模式会变得很麻烦。
val schema = spark.read.json(sampleFilePath).schema
inputDF.selectExpr("CAST(value AS STRING) as data")
.select(from_json(col("data"),schema).as("data"))
我有一个从 kafka 读取的 spark 流代码,但我的值是这样的:
"{ id:'1',name: 'John', address: 'Highway 37' }|{ id:'2',name: 'Marta', address: 'Highway 37' }|{ id:'3',name: 'Juan', address: 'Highway 37' }|{ id:'4',name: 'Erick', address: 'Highway 37' }|{ id:'6',name: 'Alex', address: 'Highway 37' }|{ id:'7',name: 'Juanjo', address: 'Highway 37' }|{ id:'8',name: 'Pam', address: 'Highway 37' }|{ id:'9',name: 'Paty', address: 'Highway 37' }|{ id:'10',name: 'Diana', address: 'Highway 37' }"
是json的字符串集合,这是我的kafka值:
val kafkaRawData = df.selectExpr("CAST(value AS string)")
但我想将这个字符串 json 数据集转换为数据框或数据集,我该怎么做???
您应该简单地使用具有以下签名的 from_json standard function and DataStreamWriter.foreachBatch 运算符:
foreachBatch(function: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]
使用 from_json
可以将字符串转换为正确的 JSON,而 foreachBatch
可以让您根据 micro-batch.
Datasets
使用 from_json
函数处理 json 字符串到对象的转换。为了简化这个,或者让它更通用,将示例 json 字符串保存在文件中以从中读取以推断模式。使用此架构并将其传递给函数。有时,对于复杂的 json 对象,使用 StructType
创建模式会变得很麻烦。
val schema = spark.read.json(sampleFilePath).schema
inputDF.selectExpr("CAST(value AS STRING) as data")
.select(from_json(col("data"),schema).as("data"))