Spark - 将平面数据框映射到可配置的嵌套 json 模式
Spark - Map flat dataframe to a configurable nested json schema
我有一个包含 5-6 列的平面数据框。我想嵌套它们并将其转换为嵌套数据框,以便我可以将其写入镶木地板格式。
但是,我不想使用 case classes,因为我试图让代码尽可能地可配置。我坚持这部分,需要一些帮助。
我的输入:
ID ID-2 Count(apple) Count(banana) Count(potato) Count(Onion)
1 23 1 0 2 0
2 23 0 1 0 1
2 29 1 0 1 0
我的输出:
第 1 行:
{
"id": 1,
"ID-2": 23,
"fruits": {
"count of apple": 1,
"count of banana": 0
},
"vegetables": {
"count of potato": 2,
"count of onion": 0
}
}
我尝试在 spark 数据框中使用 "map" 函数,我将我的值映射到一个案例 class。但是,我会尝试使用字段的名称,也可能会更改它们。
我不想维护一个案例 class 并将行映射到 sql 列名称,因为这每次都会涉及代码更改。
我正在考虑维护一个哈希图,其中包含我想与数据框的列名保持一致的列名。例如,在示例中,我将 "Count(apple)" 映射到 "count of apple"。但是,我想不出一个很好的简单方法来将我的架构作为配置传递,然后将其映射到我的代码中
::(双冒号)在 scala 中被视为 "cons" 在 scala 列表中。
这是创建 Scala 列表或将元素插入现有可变列表的方法。
scala> val aList = 24 :: 34 :: 56 :: Nil
aList: List[Int] = List(24, 34, 56)
scala> 99 :: aList
res3: List[Int] = List(99, 24, 34, 56)
在第一个例子中,Nil 是一个空列表,被认为是最右边的 cons 操作的尾部。
但是
scala> val anotherList = 23 :: 34
<console>:12: error: value :: is not a member of Int
val anotherList = 23 :: 34
这会引发错误,因为没有要插入的现有列表。
这是一种使用 scala Map
类型创建列映射的方法,使用以下数据集:
val data = Seq(
(1, 23, 1, 0, 2, 0),
(2, 23, 0, 1, 0, 1),
(2, 29, 1, 0, 1, 0)).toDF("ID", "ID-2", "count(apple)", "count(banana)", "count(potato)", "count(onion)")
首先我们使用 scala.collection.immutable.Map
集合和负责映射的函数声明映射:
import org.apache.spark.sql.{Column, DataFrame}
val colMapping = Map(
"count(banana)" -> "no of banana",
"count(apple)" -> "no of apples",
"count(potato)" -> "no of potatos",
"count(onion)" -> "no of onions")
def mapColumns(colsMapping: Map[String, String], df: DataFrame) : DataFrame = {
val mapping = df
.columns
.map{ c => if (colsMapping.contains(c)) df(c).alias(colsMapping(c)) else df(c)}
.toList
df.select(mapping:_*)
}
该函数遍历给定数据框的列,并识别与 mapping
具有公共键的列。然后它 returns 列根据应用的映射更改其名称(带有别名)。
mapColumns(colMapping, df).show(false)
的输出:
+---+----+------------+------------+-------------+------------+
|ID |ID-2|no of apples|no of banana|no of potatos|no of onions|
+---+----+------------+------------+-------------+------------+
|1 |23 |1 |0 |2 |0 |
|2 |23 |0 |1 |0 |1 |
|2 |29 |1 |0 |1 |0 |
+---+----+------------+------------+-------------+------------+
最后我们通过struct
类型生成水果和蔬菜:
df1.withColumn("fruits", struct(col(colMapping("count(banana)")), col(colMapping("count(apple)"))))
.withColumn("vegetables", struct(col(colMapping("count(potato)")), col(colMapping("count(onion)"))))
.drop(colMapping.values.toList:_*)
.toJSON
.show(false)
请注意,我们在完成转换后删除了 colMapping 集合的所有列。
输出:
+-----------------------------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------------------------+
|{"ID":1,"ID-2":23,"fruits":{"no of banana":0,"no of apples":1},"vegetables":{"no of potatos":2,"no of onions":0}}|
|{"ID":2,"ID-2":23,"fruits":{"no of banana":1,"no of apples":0},"vegetables":{"no of potatos":0,"no of onions":1}}|
|{"ID":2,"ID-2":29,"fruits":{"no of banana":0,"no of apples":1},"vegetables":{"no of potatos":1,"no of onions":0}}|
+-----------------------------------------------------------------------------------------------------------------+
val df = spark.sqlContext.read.option("header","true").csv("/sampleinput.txt")
val df1 = df.withColumn("fruits",struct("Count(apple)","Count(banana)") ).withColumn("vegetables",struct("Count(potato)","Count(Onion)")).groupBy("ID","ID-2").agg(collect_list("fruits") as "fruits",collect_list("vegetables") as "vegetables").toJSON
df1.take(1)
输出:
{"ID":"2","ID-2":"23","fruits":[{"Count(apple)":"0","Count(banana)":"1"}],"vegetables":[{"Count(potato)":"0","Count(Onion)":"1"}]}
我有一个包含 5-6 列的平面数据框。我想嵌套它们并将其转换为嵌套数据框,以便我可以将其写入镶木地板格式。
但是,我不想使用 case classes,因为我试图让代码尽可能地可配置。我坚持这部分,需要一些帮助。
我的输入:
ID ID-2 Count(apple) Count(banana) Count(potato) Count(Onion)
1 23 1 0 2 0
2 23 0 1 0 1
2 29 1 0 1 0
我的输出:
第 1 行:
{
"id": 1,
"ID-2": 23,
"fruits": {
"count of apple": 1,
"count of banana": 0
},
"vegetables": {
"count of potato": 2,
"count of onion": 0
}
}
我尝试在 spark 数据框中使用 "map" 函数,我将我的值映射到一个案例 class。但是,我会尝试使用字段的名称,也可能会更改它们。
我不想维护一个案例 class 并将行映射到 sql 列名称,因为这每次都会涉及代码更改。
我正在考虑维护一个哈希图,其中包含我想与数据框的列名保持一致的列名。例如,在示例中,我将 "Count(apple)" 映射到 "count of apple"。但是,我想不出一个很好的简单方法来将我的架构作为配置传递,然后将其映射到我的代码中
::(双冒号)在 scala 中被视为 "cons" 在 scala 列表中。 这是创建 Scala 列表或将元素插入现有可变列表的方法。
scala> val aList = 24 :: 34 :: 56 :: Nil
aList: List[Int] = List(24, 34, 56)
scala> 99 :: aList
res3: List[Int] = List(99, 24, 34, 56)
在第一个例子中,Nil 是一个空列表,被认为是最右边的 cons 操作的尾部。
但是
scala> val anotherList = 23 :: 34
<console>:12: error: value :: is not a member of Int
val anotherList = 23 :: 34
这会引发错误,因为没有要插入的现有列表。
这是一种使用 scala Map
类型创建列映射的方法,使用以下数据集:
val data = Seq(
(1, 23, 1, 0, 2, 0),
(2, 23, 0, 1, 0, 1),
(2, 29, 1, 0, 1, 0)).toDF("ID", "ID-2", "count(apple)", "count(banana)", "count(potato)", "count(onion)")
首先我们使用 scala.collection.immutable.Map
集合和负责映射的函数声明映射:
import org.apache.spark.sql.{Column, DataFrame}
val colMapping = Map(
"count(banana)" -> "no of banana",
"count(apple)" -> "no of apples",
"count(potato)" -> "no of potatos",
"count(onion)" -> "no of onions")
def mapColumns(colsMapping: Map[String, String], df: DataFrame) : DataFrame = {
val mapping = df
.columns
.map{ c => if (colsMapping.contains(c)) df(c).alias(colsMapping(c)) else df(c)}
.toList
df.select(mapping:_*)
}
该函数遍历给定数据框的列,并识别与 mapping
具有公共键的列。然后它 returns 列根据应用的映射更改其名称(带有别名)。
mapColumns(colMapping, df).show(false)
的输出:
+---+----+------------+------------+-------------+------------+
|ID |ID-2|no of apples|no of banana|no of potatos|no of onions|
+---+----+------------+------------+-------------+------------+
|1 |23 |1 |0 |2 |0 |
|2 |23 |0 |1 |0 |1 |
|2 |29 |1 |0 |1 |0 |
+---+----+------------+------------+-------------+------------+
最后我们通过struct
类型生成水果和蔬菜:
df1.withColumn("fruits", struct(col(colMapping("count(banana)")), col(colMapping("count(apple)"))))
.withColumn("vegetables", struct(col(colMapping("count(potato)")), col(colMapping("count(onion)"))))
.drop(colMapping.values.toList:_*)
.toJSON
.show(false)
请注意,我们在完成转换后删除了 colMapping 集合的所有列。
输出:
+-----------------------------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------------------------+
|{"ID":1,"ID-2":23,"fruits":{"no of banana":0,"no of apples":1},"vegetables":{"no of potatos":2,"no of onions":0}}|
|{"ID":2,"ID-2":23,"fruits":{"no of banana":1,"no of apples":0},"vegetables":{"no of potatos":0,"no of onions":1}}|
|{"ID":2,"ID-2":29,"fruits":{"no of banana":0,"no of apples":1},"vegetables":{"no of potatos":1,"no of onions":0}}|
+-----------------------------------------------------------------------------------------------------------------+
val df = spark.sqlContext.read.option("header","true").csv("/sampleinput.txt")
val df1 = df.withColumn("fruits",struct("Count(apple)","Count(banana)") ).withColumn("vegetables",struct("Count(potato)","Count(Onion)")).groupBy("ID","ID-2").agg(collect_list("fruits") as "fruits",collect_list("vegetables") as "vegetables").toJSON
df1.take(1)
输出:
{"ID":"2","ID-2":"23","fruits":[{"Count(apple)":"0","Count(banana)":"1"}],"vegetables":[{"Count(potato)":"0","Count(Onion)":"1"}]}