Spark JSON 动态关键字段的架构?
Spark JSON Schema for dynamic key fields?
我使用 from_json()
方法从 kafka 接收 JSON 数据。它需要我的架构。我的 JSON 结构是这样的;
{
"Items": {
"key1": [
{
"id": "",
"name": "",
"val": ""
}
],
"key2": [
{
"id": "",
"name": "",
"val": ""
}
],
"key3": [
{
"id": "",
"name": "",
"val": ""
}
]
}
}
处于这种状态; key1, key2, key3
字段未知。所以,他们是dynamic
。这些字段名称可能是 "abc", "def"
等。如何在 Spark Structured Streaming 中为此 JSON 定义 json 模式?
编辑:
例如另一个 json;
{
"Items": {
"stack": [
{
"id": "",
"name": "",
"val": ""
}
],
"over": [
{
"id": "",
"name": "",
"val": ""
}
],
"flow": [
{
"id": "",
"name": "",
"val": ""
}
]
}
}
key1
、key2
和 key3
不是归档。他们是关键的av值!这里的字段是 id
、name
、value
和 key
,key
的值可以是动态的,没关系,这里没有复杂性。
您需要定义架构来解析此类 json 文件。您的架构应如下所示。
val valSchema = new StructType()
.add("id", StringType)
.add("name", StringType)
.add("val", StringType)
val valArrSchema = new ArrayType(valSchema, true)
val mapSchema = new MapType(StringType, valArrSchema, true)
val jsonSchema = new StructType().add("Items", mapSchema)
您可以使用 from_json 方法解析 json。
val testDF = df.withColumn("json", from_json(col("value"),jsonSchema ))
我使用 from_json()
方法从 kafka 接收 JSON 数据。它需要我的架构。我的 JSON 结构是这样的;
{
"Items": {
"key1": [
{
"id": "",
"name": "",
"val": ""
}
],
"key2": [
{
"id": "",
"name": "",
"val": ""
}
],
"key3": [
{
"id": "",
"name": "",
"val": ""
}
]
}
}
处于这种状态; key1, key2, key3
字段未知。所以,他们是dynamic
。这些字段名称可能是 "abc", "def"
等。如何在 Spark Structured Streaming 中为此 JSON 定义 json 模式?
编辑: 例如另一个 json;
{
"Items": {
"stack": [
{
"id": "",
"name": "",
"val": ""
}
],
"over": [
{
"id": "",
"name": "",
"val": ""
}
],
"flow": [
{
"id": "",
"name": "",
"val": ""
}
]
}
}
key1
、key2
和 key3
不是归档。他们是关键的av值!这里的字段是 id
、name
、value
和 key
,key
的值可以是动态的,没关系,这里没有复杂性。
您需要定义架构来解析此类 json 文件。您的架构应如下所示。
val valSchema = new StructType()
.add("id", StringType)
.add("name", StringType)
.add("val", StringType)
val valArrSchema = new ArrayType(valSchema, true)
val mapSchema = new MapType(StringType, valArrSchema, true)
val jsonSchema = new StructType().add("Items", mapSchema)
您可以使用 from_json 方法解析 json。
val testDF = df.withColumn("json", from_json(col("value"),jsonSchema ))