在 Spark 中从嵌套的其他 DF/RDD(嵌套的 Json)创建 DF/RDD
Create DF/RDD from nested other DF/RDD (Nested Json) in Spark
我是 Spark&Scala 方面的新手,如果有人能向我解释一下就太好了。
让我们跟随JSON
{
"id": 1,
"persons": [{
"name": "n1",
"lastname": "l1",
"hobbies": [{
"name": "h1",
"activity": "a1"
},
{
"name": "h2",
"activity": "a2"
}]
},
{
"name": "n2",
"lastname": "l2",
"hobbies": [{
"name": "h3",
"activity": "a3"
},
{
"name": "h4",
"activity": "a4"
}]
}]
}
我正在通过 sc.parralelize(file.json) 将此 Json 加载到 RDD,并通过 sqlContext.sql.load.json(file.json) 加载到 DF。到目前为止一切顺利,这为我提供了提到的 Json 的 RDD 和 DF(带有模式),但我想从现有的包含所有不同 "hobbies" 记录的记录中创建另一个 RDD/DF。我怎样才能达到那样的目的?
我从我的操作中得到的唯一东西是多个 WrappedArrays for Hobbies 但我不能更深入也不能将它们分配给 DF/RDD。
我目前拥有的 SqlContext 代码
val jsonData = sqlContext.read.json("path/file.json")
jsonData.registerTempTable("jsonData") //I receive schema for whole file
val hobbies = sqlContext.sql("SELECT persons.hobbies FROM jasonData") //subschema for hobbies
hobbies.show()
这让我
+--------------------+
| hobbies|
+--------------------+
|[WrappedArray([a1...|
+--------------------+
我期望的更像是:
+--------------------+-----------------+
| name | activity |
+--------------------+-----------------|
| h1| a1 |
+--------------------+-----------------+
| h2| a2 |
+--------------------+-----------------+
| h3| a3 |
+--------------------+-----------------+
| h4| a4 |
+--------------------+-----------------+
我完全按照您的方式将您的示例加载到数据帧 hobbies
中并使用了它。您可以 运行 类似以下内容:
val distinctHobbies = hobbies.rdd.flatMap {row => row.getSeq[List[Row]](0).flatten}.map(row => (row.getString(0), row.getString(1))).distinct
val dhDF = distinctHobbies.toDF("activity", "name")
这实质上是将您的爱好结构扁平化,将其转换为一个元组,并且 运行 在返回的元组上设置一个不同的结构。然后我们将其转换回正确列别名下的数据框。因为我们是通过底层的 RDD 来做的,所以可能还有一种更有效的方法来只使用 DataFrame API。
无论如何,当我 运行 在你的例子中,我看到:
scala> val distinctHobbies = hobbies.rdd.flatMap {row => row.getSeq[List[Row]](0).flatten}.map(row => (row.getString(0), row.getString(1))).distinct
distinctHobbies: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[121] at distinct at <console>:24
scala> val dhDF = distinctHobbies.toDF("activity", "name")
dhDF: org.apache.spark.sql.DataFrame = [activity: string, name: string]
scala> dhDF.show
...
+--------+----+
|activity|name|
+--------+----+
| a2| h2|
| a1| h1|
| a3| h3|
| a4| h4|
+--------+----+
我是 Spark&Scala 方面的新手,如果有人能向我解释一下就太好了。 让我们跟随JSON
{
"id": 1,
"persons": [{
"name": "n1",
"lastname": "l1",
"hobbies": [{
"name": "h1",
"activity": "a1"
},
{
"name": "h2",
"activity": "a2"
}]
},
{
"name": "n2",
"lastname": "l2",
"hobbies": [{
"name": "h3",
"activity": "a3"
},
{
"name": "h4",
"activity": "a4"
}]
}]
}
我正在通过 sc.parralelize(file.json) 将此 Json 加载到 RDD,并通过 sqlContext.sql.load.json(file.json) 加载到 DF。到目前为止一切顺利,这为我提供了提到的 Json 的 RDD 和 DF(带有模式),但我想从现有的包含所有不同 "hobbies" 记录的记录中创建另一个 RDD/DF。我怎样才能达到那样的目的? 我从我的操作中得到的唯一东西是多个 WrappedArrays for Hobbies 但我不能更深入也不能将它们分配给 DF/RDD。
我目前拥有的 SqlContext 代码
val jsonData = sqlContext.read.json("path/file.json")
jsonData.registerTempTable("jsonData") //I receive schema for whole file
val hobbies = sqlContext.sql("SELECT persons.hobbies FROM jasonData") //subschema for hobbies
hobbies.show()
这让我
+--------------------+
| hobbies|
+--------------------+
|[WrappedArray([a1...|
+--------------------+
我期望的更像是:
+--------------------+-----------------+
| name | activity |
+--------------------+-----------------|
| h1| a1 |
+--------------------+-----------------+
| h2| a2 |
+--------------------+-----------------+
| h3| a3 |
+--------------------+-----------------+
| h4| a4 |
+--------------------+-----------------+
我完全按照您的方式将您的示例加载到数据帧 hobbies
中并使用了它。您可以 运行 类似以下内容:
val distinctHobbies = hobbies.rdd.flatMap {row => row.getSeq[List[Row]](0).flatten}.map(row => (row.getString(0), row.getString(1))).distinct
val dhDF = distinctHobbies.toDF("activity", "name")
这实质上是将您的爱好结构扁平化,将其转换为一个元组,并且 运行 在返回的元组上设置一个不同的结构。然后我们将其转换回正确列别名下的数据框。因为我们是通过底层的 RDD 来做的,所以可能还有一种更有效的方法来只使用 DataFrame API。
无论如何,当我 运行 在你的例子中,我看到:
scala> val distinctHobbies = hobbies.rdd.flatMap {row => row.getSeq[List[Row]](0).flatten}.map(row => (row.getString(0), row.getString(1))).distinct
distinctHobbies: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[121] at distinct at <console>:24
scala> val dhDF = distinctHobbies.toDF("activity", "name")
dhDF: org.apache.spark.sql.DataFrame = [activity: string, name: string]
scala> dhDF.show
...
+--------+----+
|activity|name|
+--------+----+
| a2| h2|
| a1| h1|
| a3| h3|
| a4| h4|
+--------+----+