jsontostructs 到 spark 结构化流中的行
jsontostructs to Row in spark structured streaming
我正在使用 Spark 2.2,我正在尝试从 Kafka 读取 JSON 消息,将它们转换为 DataFrame
并将它们作为 Row
:
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.load()
.select(col("value").cast(StringType).as("col"))
.writeStream()
.format("console")
.start();
有了这个我可以实现:
+--------------------+
| col|
+--------------------+
|{"myField":"somet...|
+--------------------+
我想要更像这样的东西:
+--------------------+
| myField|
+--------------------+
|"something" |
+--------------------+
我尝试使用 from_json
函数使用 struct
:
DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("myField", DataTypes.StringType)
}
)
但我只得到:
+--------------------+
| jsontostructs(col)|
+--------------------+
|[something] |
+--------------------+
然后我尝试使用 explode
但我只得到异常说:
cannot resolve 'explode(`col`)' due to data type mismatch:
input to function explode should be array or map type, not
StructType(StructField(...
知道如何进行这项工作吗?
您快完成了,只是 select 正确。 from_json
returns 与架构匹配的 struct
列。如果架构(JSON 表示)如下所示:
{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]}
你会得到等价于的嵌套对象:
root
|-- jsontostructs(col): struct (nullable = true)
| |-- myField: string (nullable = false)
您可以使用getField
(或getItem
)方法来select特定字段
df.select(from_json(col("col"), schema).getField("myField").alias("myField"));
或 .*
到 select struct
中的所有顶级字段:
df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*");
虽然对于单个 string
列,get_json_object
应该绰绰有余:
df.select(get_json_object(col("col"), "$.myField"));
我正在使用 Spark 2.2,我正在尝试从 Kafka 读取 JSON 消息,将它们转换为 DataFrame
并将它们作为 Row
:
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.load()
.select(col("value").cast(StringType).as("col"))
.writeStream()
.format("console")
.start();
有了这个我可以实现:
+--------------------+
| col|
+--------------------+
|{"myField":"somet...|
+--------------------+
我想要更像这样的东西:
+--------------------+
| myField|
+--------------------+
|"something" |
+--------------------+
我尝试使用 from_json
函数使用 struct
:
DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("myField", DataTypes.StringType)
}
)
但我只得到:
+--------------------+
| jsontostructs(col)|
+--------------------+
|[something] |
+--------------------+
然后我尝试使用 explode
但我只得到异常说:
cannot resolve 'explode(`col`)' due to data type mismatch:
input to function explode should be array or map type, not
StructType(StructField(...
知道如何进行这项工作吗?
您快完成了,只是 select 正确。 from_json
returns 与架构匹配的 struct
列。如果架构(JSON 表示)如下所示:
{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]}
你会得到等价于的嵌套对象:
root
|-- jsontostructs(col): struct (nullable = true)
| |-- myField: string (nullable = false)
您可以使用getField
(或getItem
)方法来select特定字段
df.select(from_json(col("col"), schema).getField("myField").alias("myField"));
或 .*
到 select struct
中的所有顶级字段:
df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*");
虽然对于单个 string
列,get_json_object
应该绰绰有余:
df.select(get_json_object(col("col"), "$.myField"));