解析 Spark Streaming SQL 中的嵌套 JSON 字符串化列
Parse nested JSON stringified column in Spark Streaming SQL
我遵循了 spark streaming 指南,并且能够使用 sqlContext.read.json(rdd)
获得我的 json 数据的 sql 上下文。问题是 json 字段之一是我想解析的 JSON 字符串本身。
有没有办法在 spark sql 中完成此操作,或者使用 ObjectMapper 来解析字符串并连接到其余数据会更容易吗?
澄清一下,JSON 的值之一是 字符串 包含 JSON 数据,内部引号被转义。我正在寻找一种方法来告诉解析器将该值视为字符串化 JSON
示例Json
{
"key": "val",
"jsonString": "{ \"too\": \"bad\" }",
"jsonObj": { "ok": "great" }
}
SQLContext 如何解析它
root
|-- key: string (nullable = true)
|-- jsonString: string (nullable = true)
|-- jsonObj: struct (nullable = true)
| |-- ok: string (nullable = true)
我多么希望
root
|-- key: string (nullable = true)
|-- jsonString: struct (nullable = true)
| |-- too: string (nullable = true)
|-- jsonObj: struct (nullable = true)
| |-- ok: string (nullable = true)
显然
"jsonString": "{ \"too\": \"bad\" }"
是无效的 json 数据,修复:并确保整个字符串是有效的 json 结构。
你提供的json是错误的,所以修正并给你一个例子。
让 json 如下所示。
{"key": "val","jsonString": {"too": "bad"},"jsonObj": {"ok": "great"}}
Spark SQL Json 解析器也将允许您读取嵌套的 json,坦率地说,如果没有提供,它将是不完整的,因为您将看到几乎 99 % 嵌套 jsons.
关于如何访问它,您需要 select 使用 . .在这里,jsonString.too 或 jsonObj.ok。
下面举例理解
scala> val df1 = sqlContext.read.json("/Users/srini/workspace/splunk_spark/file3.json").toDF
df1: org.apache.spark.sql.DataFrame = [jsonObj: struct<ok:string>, jsonString: struct<too:string>, key: string]
scala> df1.show
+-------+----------+---+
|jsonObj|jsonString|key|
+-------+----------+---+
|[great]| [bad]|val|
+-------+----------+---+
scala> df1.select("jsonString.too");
res12: org.apache.spark.sql.DataFrame = [too: string]
scala> df1.select("jsonString.too").show
+---+
|too|
+---+
|bad|
+---+
scala> df1.select("jsonObj.ok").show
+-----+
| ok|
+-----+
|great|
+-----+
希望您能理解。如果您需要更多信息,请回复。它只是父节点。子节点。而已。
较旧的 RDD API 方法(请参阅接受的 DataFrame 答案 API)
我最终使用 Jackson 解析 json 信封,然后再次解析内部转义字符串。
val parsedRDD = rdd.map(x => {
// Get Jackson mapper
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
// parse envelope
val envelopeMap = mapper.readValue[Map[String,Any]](x)
//println("the original envelopeMap", envelopeMap)
// parse inner jsonString value
val event = mapper.readValue[Map[String,Any]](envelopeMap.getOrElse("body", "").asInstanceOf[String])
// get Map that includes parsed jsonString
val parsed = envelopeMap.updated("jsonString", event)
// write entire map as json string
mapper.writeValueAsString(parsed)
})
val df = sqlContext.read.json(parsedRDD)
现在 parsedRDD 包含有效的 json 并且数据框正确地推断出整个模式。
我认为必须有一种方法可以避免序列化为 json 并再次解析,但到目前为止我没有看到任何在 RDD[Map[String] 上运行的 sqlContext APIs , 任何]]
您可以使用 from_json
函数来解析 DataSet 的列:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val stringified = spark.createDataset(Seq("{ \"too\": \"bad\" }", "{ \"too\": \"sad\" }"))
stringified.printSchema()
val structified = stringified.withColumn("value", from_json($"value", StructType(Seq(StructField("too", StringType, false)))))
structified.printSchema()
将 [=13th=] 列从 string
转换为 struct
:
root
|-- value: string (nullable = true)
root
|-- value: struct (nullable = true)
| |-- too: string (nullable = false)
我遵循了 spark streaming 指南,并且能够使用 sqlContext.read.json(rdd)
获得我的 json 数据的 sql 上下文。问题是 json 字段之一是我想解析的 JSON 字符串本身。
有没有办法在 spark sql 中完成此操作,或者使用 ObjectMapper 来解析字符串并连接到其余数据会更容易吗?
澄清一下,JSON 的值之一是 字符串 包含 JSON 数据,内部引号被转义。我正在寻找一种方法来告诉解析器将该值视为字符串化 JSON
示例Json
{
"key": "val",
"jsonString": "{ \"too\": \"bad\" }",
"jsonObj": { "ok": "great" }
}
SQLContext 如何解析它
root
|-- key: string (nullable = true)
|-- jsonString: string (nullable = true)
|-- jsonObj: struct (nullable = true)
| |-- ok: string (nullable = true)
我多么希望
root
|-- key: string (nullable = true)
|-- jsonString: struct (nullable = true)
| |-- too: string (nullable = true)
|-- jsonObj: struct (nullable = true)
| |-- ok: string (nullable = true)
显然
"jsonString": "{ \"too\": \"bad\" }"
是无效的 json 数据,修复:并确保整个字符串是有效的 json 结构。
你提供的json是错误的,所以修正并给你一个例子。
让 json 如下所示。 {"key": "val","jsonString": {"too": "bad"},"jsonObj": {"ok": "great"}}
Spark SQL Json 解析器也将允许您读取嵌套的 json,坦率地说,如果没有提供,它将是不完整的,因为您将看到几乎 99 % 嵌套 jsons.
关于如何访问它,您需要 select 使用 . .在这里,jsonString.too 或 jsonObj.ok。
下面举例理解
scala> val df1 = sqlContext.read.json("/Users/srini/workspace/splunk_spark/file3.json").toDF
df1: org.apache.spark.sql.DataFrame = [jsonObj: struct<ok:string>, jsonString: struct<too:string>, key: string]
scala> df1.show
+-------+----------+---+
|jsonObj|jsonString|key|
+-------+----------+---+
|[great]| [bad]|val|
+-------+----------+---+
scala> df1.select("jsonString.too");
res12: org.apache.spark.sql.DataFrame = [too: string]
scala> df1.select("jsonString.too").show
+---+
|too|
+---+
|bad|
+---+
scala> df1.select("jsonObj.ok").show
+-----+
| ok|
+-----+
|great|
+-----+
希望您能理解。如果您需要更多信息,请回复。它只是父节点。子节点。而已。
较旧的 RDD API 方法(请参阅接受的 DataFrame 答案 API)
我最终使用 Jackson 解析 json 信封,然后再次解析内部转义字符串。
val parsedRDD = rdd.map(x => {
// Get Jackson mapper
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
// parse envelope
val envelopeMap = mapper.readValue[Map[String,Any]](x)
//println("the original envelopeMap", envelopeMap)
// parse inner jsonString value
val event = mapper.readValue[Map[String,Any]](envelopeMap.getOrElse("body", "").asInstanceOf[String])
// get Map that includes parsed jsonString
val parsed = envelopeMap.updated("jsonString", event)
// write entire map as json string
mapper.writeValueAsString(parsed)
})
val df = sqlContext.read.json(parsedRDD)
现在 parsedRDD 包含有效的 json 并且数据框正确地推断出整个模式。
我认为必须有一种方法可以避免序列化为 json 并再次解析,但到目前为止我没有看到任何在 RDD[Map[String] 上运行的 sqlContext APIs , 任何]]
您可以使用 from_json
函数来解析 DataSet 的列:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val stringified = spark.createDataset(Seq("{ \"too\": \"bad\" }", "{ \"too\": \"sad\" }"))
stringified.printSchema()
val structified = stringified.withColumn("value", from_json($"value", StructType(Seq(StructField("too", StringType, false)))))
structified.printSchema()
将 [=13th=] 列从 string
转换为 struct
:
root
|-- value: string (nullable = true)
root
|-- value: struct (nullable = true)
| |-- too: string (nullable = false)