在 Spark 中从嵌套的其他 DF/RDD(嵌套的 Json)创建 DF/RDD

Create DF/RDD from nested other DF/RDD (Nested Json) in Spark

我是 Spark&Scala 方面的新手,如果有人能向我解释一下就太好了。 让我们跟随JSON

    {
    "id": 1,
    "persons": [{
        "name": "n1",
        "lastname": "l1",
        "hobbies": [{
            "name": "h1",
            "activity": "a1"
        },
        {
            "name": "h2",
            "activity": "a2"
        }]
    },
    {
        "name": "n2",
        "lastname": "l2",
        "hobbies": [{
            "name": "h3",
            "activity": "a3"
        },
        {
            "name": "h4",
            "activity": "a4"
        }]
    }]
    }

我正在通过 sc.parralelize(file.json) 将此 Json 加载到 RDD,并通过 sqlContext.sql.load.json(file.json) 加载到 DF。到目前为止一切顺利,这为我提供了提到的 Json 的 RDD 和 DF(带有模式),但我想从现有的包含所有不同 "hobbies" 记录的记录中创建另一个 RDD/DF。我怎样才能达到那样的目的? 我从我的操作中得到的唯一东西是多个 WrappedArrays for Hobbies 但我不能更深入也不能将它们分配给 DF/RDD。

我目前拥有的 SqlContext 代码

    val jsonData = sqlContext.read.json("path/file.json")
    jsonData.registerTempTable("jsonData") //I receive schema for whole file
    val hobbies = sqlContext.sql("SELECT persons.hobbies FROM jasonData") //subschema for hobbies
    hobbies.show()

这让我

    +--------------------+
    |             hobbies|
    +--------------------+
    |[WrappedArray([a1...|
    +--------------------+

我期望的更像是:

    +--------------------+-----------------+
    |             name   |        activity |
    +--------------------+-----------------|
    |                  h1|              a1 |
    +--------------------+-----------------+
    |                  h2|              a2 |
    +--------------------+-----------------+
    |                  h3|              a3 |
    +--------------------+-----------------+
    |                  h4|              a4 |
    +--------------------+-----------------+

我完全按照您的方式将您的示例加载到数据帧 hobbies 中并使用了它。您可以 运行 类似以下内容:

val distinctHobbies = hobbies.rdd.flatMap {row => row.getSeq[List[Row]](0).flatten}.map(row => (row.getString(0), row.getString(1))).distinct
val dhDF = distinctHobbies.toDF("activity", "name")

这实质上是将您的爱好结构扁平化,将其转换为一个元组,并且 运行 在返回的元组上设置一个不同的结构。然后我们将其转换回正确列别名下的数据框。因为我们是通过底层的 RDD 来做的,所以可能还有一种更有效的方法来只使用 DataFrame API。

无论如何,当我 运行 在你的例子中,我看到:

scala> val distinctHobbies = hobbies.rdd.flatMap {row => row.getSeq[List[Row]](0).flatten}.map(row => (row.getString(0), row.getString(1))).distinct
distinctHobbies: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[121] at distinct at <console>:24

scala> val dhDF = distinctHobbies.toDF("activity", "name")
dhDF: org.apache.spark.sql.DataFrame = [activity: string, name: string]

scala> dhDF.show
...
+--------+----+
|activity|name|
+--------+----+
|      a2|  h2|
|      a1|  h1|
|      a3|  h3|
|      a4|  h4|
+--------+----+