如何访问嵌套架构列?
How to access nested schema column?
我有一个带有 JSON 的 Kafka 流媒体源,例如{"type":"abc","1":"23.2"}
.
查询给出以下异常:
org.apache.spark.sql.catalyst.parser.ParseException: extraneous
input '.1' expecting {<EOF>, .......}
== SQL ==
person.1
访问 "person.1"
的正确语法是什么?
我什至将 DoubleType
更改为 StringType
,但这也没有用。示例仅通过保留 person.type
并删除 selectExpr
:
中的 person.1
就可以正常工作
val personJsonDf = inputDf.selectExpr("CAST(value AS STRING)")
val struct = new StructType()
.add("type", DataTypes.StringType)
.add("1", DataTypes.DoubleType)
val personNestedDf = personJsonDf
.select(from_json($"value", struct).as("person"))
val personFlattenedDf = personNestedDf
.selectExpr("person.type", "person.1")
val consoleOutput = personNestedDf.writeStream
.outputMode("update")
.format("console")
.start()
我已经使用 person.*
解决了这个问题
+-----+--------+
|type | 1 |
+-----+--------+
|abc |23.2 |
+-----+--------+
有趣,因为 select($"person.1")
应该可以正常工作(但是您使用了 selectExpr
,这可能会混淆 Spark SQL)。
StructField(1,DoubleType,true)
将不起作用,因为类型实际上应该是 StringType
.
让我们看看...
$ cat input.json
{"type":"abc","1":"23.2"}
val input = spark.read.text("input.json")
scala> input.show(false)
+-------------------------+
|value |
+-------------------------+
|{"type":"abc","1":"23.2"}|
+-------------------------+
import org.apache.spark.sql.types._
val struct = new StructType()
.add("type", DataTypes.StringType)
.add("1", DataTypes.StringType)
val q = input.select(from_json($"value", struct).as("person"))
scala> q.show
+-----------+
| person|
+-----------+
|[abc, 23.2]|
+-----------+
val q = input.select(from_json($"value", struct).as("person")).select($"person.1")
scala> q.show
+----+
| 1|
+----+
|23.2|
+----+
我有一个带有 JSON 的 Kafka 流媒体源,例如{"type":"abc","1":"23.2"}
.
查询给出以下异常:
org.apache.spark.sql.catalyst.parser.ParseException: extraneous
input '.1' expecting {<EOF>, .......}
== SQL ==
person.1
访问 "person.1"
的正确语法是什么?
我什至将 DoubleType
更改为 StringType
,但这也没有用。示例仅通过保留 person.type
并删除 selectExpr
:
person.1
就可以正常工作
val personJsonDf = inputDf.selectExpr("CAST(value AS STRING)")
val struct = new StructType()
.add("type", DataTypes.StringType)
.add("1", DataTypes.DoubleType)
val personNestedDf = personJsonDf
.select(from_json($"value", struct).as("person"))
val personFlattenedDf = personNestedDf
.selectExpr("person.type", "person.1")
val consoleOutput = personNestedDf.writeStream
.outputMode("update")
.format("console")
.start()
我已经使用 person.*
+-----+--------+
|type | 1 |
+-----+--------+
|abc |23.2 |
+-----+--------+
有趣,因为 select($"person.1")
应该可以正常工作(但是您使用了 selectExpr
,这可能会混淆 Spark SQL)。
StructField(1,DoubleType,true)
将不起作用,因为类型实际上应该是 StringType
.
让我们看看...
$ cat input.json
{"type":"abc","1":"23.2"}
val input = spark.read.text("input.json")
scala> input.show(false)
+-------------------------+
|value |
+-------------------------+
|{"type":"abc","1":"23.2"}|
+-------------------------+
import org.apache.spark.sql.types._
val struct = new StructType()
.add("type", DataTypes.StringType)
.add("1", DataTypes.StringType)
val q = input.select(from_json($"value", struct).as("person"))
scala> q.show
+-----------+
| person|
+-----------+
|[abc, 23.2]|
+-----------+
val q = input.select(from_json($"value", struct).as("person")).select($"person.1")
scala> q.show
+----+
| 1|
+----+
|23.2|
+----+