将具有模式的 spark Dataframe 转换为 json String 的数据帧
Convert spark Dataframe with schema to dataframe of json String
我有一个这样的数据框:
+--+--------+--------+----+-------------+------------------------------+
|id|name |lastname|age |timestamp |creditcards |
+--+--------+--------+----+-------------+------------------------------+
|1 |michel |blanc |35 |1496756626921|[[hr6,3569823], [ee3,1547869]]|
|2 |peter |barns |25 |1496756626551|[[ye8,4569872], [qe5,3485762]]|
+--+--------+--------+----+-------------+------------------------------+
我的 df 架构如下所示:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- lastname: string (nullable = true)
|-- age: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- creditcards: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- number: string (nullable = true)
我想将每一行转换为了解我的架构的 json 字符串。所以这个数据框将有一个包含 json 的列字符串。
第一行应该是这样的:
{
"id":"1",
"name":"michel",
"lastname":"blanc",
"age":"35",
"timestamp":"1496756626921",
"creditcards":[
{
"id":"hr6",
"number":"3569823"
},
{
"id":"ee3",
"number":"1547869"
}
]
}
数据帧的第二行应该是这样的:
{
"id":"2",
"name":"peter",
"lastname":"barns",
"age":"25",
"timestamp":"1496756626551",
"creditcards":[
{
"id":"ye8",
"number":"4569872"
},
{
"id":"qe5",
"number":"3485762"
}
]
}
我的目标不是将数据帧写入 json 文件。我的目标是将 df1 转换为第二个 df2,以便将 df2 的每个 json 行推送到 kafka 主题
我有这个代码来创建数据框:
val line1 = """{"id":"1","name":"michel","lastname":"blanc","age":"35","timestamp":"1496756626921","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3","number":"1547869"}]}"""
val line2 = """{"id":"2","name":"peter","lastname":"barns","age":"25","timestamp":"1496756626551","creditcards":[{"id":"ye8","number":"4569872"}, {"id":"qe5","number":"3485762"}]}"""
val rdd = sc.parallelize(Seq(line1, line2))
val df = sqlContext.read.json(rdd)
df show false
df printSchema
你有什么想法吗?
如果您只需要一个单列 DataFrame/Dataset,每个列值代表 JSON 中原始 DataFrame 的每一行,您可以简单地将 toJSON
应用到您的 DataFrame,如下所示:
df.show
// +---+------------------------------+---+--------+------+-------------+
// |age|creditcards |id |lastname|name |timestamp |
// +---+------------------------------+---+--------+------+-------------+
// |35 |[[hr6,3569823], [ee3,1547869]]|1 |blanc |michel|1496756626921|
// |25 |[[ye8,4569872], [qe5,3485762]]|2 |barns |peter |1496756626551|
// +---+------------------------------+---+--------+------+-------------+
val dsJson = df.toJSON
// dsJson: org.apache.spark.sql.Dataset[String] = [value: string]
dsJson.show
// +--------------------------------------------------------------------------+
// |value |
// +--------------------------------------------------------------------------+
// |{"age":"35","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3",...|
// |{"age":"25","creditcards":[{"id":"ye8","number":"4569872"},{"id":"qe5",...|
// +--------------------------------------------------------------------------+
[更新]
要将 name
添加为附加列,您可以使用 from_json
:
从 JSON 列中提取它
val result = dsJson.withColumn("name", from_json($"value", df.schema)("name"))
result.show
// +--------------------+------+
// | value| name|
// +--------------------+------+
// |{"age":"35","cred...|michel|
// |{"age":"25","cred...| peter|
// +--------------------+------+
为此,您可以使用
将数据框直接转换为 JSON 字符串的数据集
val jsonDataset: Dataset[String] = df.toJSON
您可以使用
将其转换为数据框
val jsonDF: DataFrame = jsonDataset.toDF
此处 json 将按字母顺序排列,因此
的输出
jsonDF show false
将会
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"age":"35","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3","number":"1547869"}],"id":"1","lastname":"blanc","name":"michel","timestamp":"1496756626921"}|
|{"age":"25","creditcards":[{"id":"ye8","number":"4569872"},{"id":"qe5","number":"3485762"}],"id":"2","lastname":"barns","name":"peter","timestamp":"1496756626551"} |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
我有一个这样的数据框:
+--+--------+--------+----+-------------+------------------------------+
|id|name |lastname|age |timestamp |creditcards |
+--+--------+--------+----+-------------+------------------------------+
|1 |michel |blanc |35 |1496756626921|[[hr6,3569823], [ee3,1547869]]|
|2 |peter |barns |25 |1496756626551|[[ye8,4569872], [qe5,3485762]]|
+--+--------+--------+----+-------------+------------------------------+
我的 df 架构如下所示:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- lastname: string (nullable = true)
|-- age: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- creditcards: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- number: string (nullable = true)
我想将每一行转换为了解我的架构的 json 字符串。所以这个数据框将有一个包含 json 的列字符串。 第一行应该是这样的:
{
"id":"1",
"name":"michel",
"lastname":"blanc",
"age":"35",
"timestamp":"1496756626921",
"creditcards":[
{
"id":"hr6",
"number":"3569823"
},
{
"id":"ee3",
"number":"1547869"
}
]
}
数据帧的第二行应该是这样的:
{
"id":"2",
"name":"peter",
"lastname":"barns",
"age":"25",
"timestamp":"1496756626551",
"creditcards":[
{
"id":"ye8",
"number":"4569872"
},
{
"id":"qe5",
"number":"3485762"
}
]
}
我的目标不是将数据帧写入 json 文件。我的目标是将 df1 转换为第二个 df2,以便将 df2 的每个 json 行推送到 kafka 主题 我有这个代码来创建数据框:
val line1 = """{"id":"1","name":"michel","lastname":"blanc","age":"35","timestamp":"1496756626921","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3","number":"1547869"}]}"""
val line2 = """{"id":"2","name":"peter","lastname":"barns","age":"25","timestamp":"1496756626551","creditcards":[{"id":"ye8","number":"4569872"}, {"id":"qe5","number":"3485762"}]}"""
val rdd = sc.parallelize(Seq(line1, line2))
val df = sqlContext.read.json(rdd)
df show false
df printSchema
你有什么想法吗?
如果您只需要一个单列 DataFrame/Dataset,每个列值代表 JSON 中原始 DataFrame 的每一行,您可以简单地将 toJSON
应用到您的 DataFrame,如下所示:
df.show
// +---+------------------------------+---+--------+------+-------------+
// |age|creditcards |id |lastname|name |timestamp |
// +---+------------------------------+---+--------+------+-------------+
// |35 |[[hr6,3569823], [ee3,1547869]]|1 |blanc |michel|1496756626921|
// |25 |[[ye8,4569872], [qe5,3485762]]|2 |barns |peter |1496756626551|
// +---+------------------------------+---+--------+------+-------------+
val dsJson = df.toJSON
// dsJson: org.apache.spark.sql.Dataset[String] = [value: string]
dsJson.show
// +--------------------------------------------------------------------------+
// |value |
// +--------------------------------------------------------------------------+
// |{"age":"35","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3",...|
// |{"age":"25","creditcards":[{"id":"ye8","number":"4569872"},{"id":"qe5",...|
// +--------------------------------------------------------------------------+
[更新]
要将 name
添加为附加列,您可以使用 from_json
:
val result = dsJson.withColumn("name", from_json($"value", df.schema)("name"))
result.show
// +--------------------+------+
// | value| name|
// +--------------------+------+
// |{"age":"35","cred...|michel|
// |{"age":"25","cred...| peter|
// +--------------------+------+
为此,您可以使用
将数据框直接转换为 JSON 字符串的数据集val jsonDataset: Dataset[String] = df.toJSON
您可以使用
将其转换为数据框val jsonDF: DataFrame = jsonDataset.toDF
此处 json 将按字母顺序排列,因此
的输出jsonDF show false
将会
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"age":"35","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3","number":"1547869"}],"id":"1","lastname":"blanc","name":"michel","timestamp":"1496756626921"}|
|{"age":"25","creditcards":[{"id":"ye8","number":"4569872"},{"id":"qe5","number":"3485762"}],"id":"2","lastname":"barns","name":"peter","timestamp":"1496756626551"} |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+