Spark 使用相同的键创建字段数组
Spark creating array of feild with same key
我有一个位于 spark 上下文之上的配置单元 table。 table 的格式如下
| key | param1 | Param 2|
-------------------------
| A | A11 | A12 |
| B | B11 | B12 |
| A | A21 | A22 |
我想创建一个具有架构的 DataFrame
val dataSchema = new StructType(
Array(
StructField("key", StringType, nullable = true),
StructField("param", ArrayType(
StructType( Array(
StructField("param1", StringType, nullable = true),
StructField("param2", StringType, nullable = true)
)), containsNull = true), nullable = true)
)
)
从上面table
所以最后的 Table 变成
| key | param |
-------------------------------------------------------------
| A | [{param1:A11, param2:A12},{param1:A11, param2:A12}]|
| B | [{param1:B11, param2:B12}] |
我正在使用配置单元上下文(hiveContext.table("table_name"))加载 table,其中 returns 数据框。
scala> val df = hiveContext.table("sample")
df: org.apache.spark.sql.DataFrame = [fk: string, param1: string, param2: string]
scala> val dfStruct = df.select($"key", struct($"param1", $"param2").alias("param"))
dfStruct: org.apache.spark.sql.DataFrame = [fk: string, sub: struct<param1:string,param2:string>]
scala> dfStruct.show
+--+----------+
|fk| param|
+--+----------+
| A| [A11,A12]|
| B| [B11,B12]|
| A| [A21,A22]|
+--+----------+
scala>
我正在尝试使用 dataframe 使用 groupBy 转换为上述 table。但是做不到。
我找到了自己。
关键是使用 case class
而不是 structType
case class Param(param1: String, param2:String)
case class Sample(key: String, param:Array[Param])
val df = hiveContext.table("sample_sub")
val SampleDF = df.select($"fk", $"param1", $"param2")
val SampleDFMap = SampleDF.rdd.groupBy(r => r.getAs[String]("fk"))
val SampleJoinRDD = SampleDFMap.map(
r => Sample(r._1.asInstanceOf[String], r._2.map (
row => Param(row(1).asInstanceOf[String],row(2).asInstanceOf[String])
).toArray
)
)
SampleJoinRDD.toDF.toJSON.collect
// Array({"key":"A","param":[{"param1":"A11","param2":"A12"},{"param1":"A21","param2":"A22"}]}, {"key":"B","param":[{"param1":"B11","param2":"B12"}]})
我有一个位于 spark 上下文之上的配置单元 table。 table 的格式如下
| key | param1 | Param 2|
-------------------------
| A | A11 | A12 |
| B | B11 | B12 |
| A | A21 | A22 |
我想创建一个具有架构的 DataFrame
val dataSchema = new StructType(
Array(
StructField("key", StringType, nullable = true),
StructField("param", ArrayType(
StructType( Array(
StructField("param1", StringType, nullable = true),
StructField("param2", StringType, nullable = true)
)), containsNull = true), nullable = true)
)
)
从上面table
所以最后的 Table 变成
| key | param |
-------------------------------------------------------------
| A | [{param1:A11, param2:A12},{param1:A11, param2:A12}]|
| B | [{param1:B11, param2:B12}] |
我正在使用配置单元上下文(hiveContext.table("table_name"))加载 table,其中 returns 数据框。
scala> val df = hiveContext.table("sample")
df: org.apache.spark.sql.DataFrame = [fk: string, param1: string, param2: string]
scala> val dfStruct = df.select($"key", struct($"param1", $"param2").alias("param"))
dfStruct: org.apache.spark.sql.DataFrame = [fk: string, sub: struct<param1:string,param2:string>]
scala> dfStruct.show
+--+----------+
|fk| param|
+--+----------+
| A| [A11,A12]|
| B| [B11,B12]|
| A| [A21,A22]|
+--+----------+
scala>
我正在尝试使用 dataframe 使用 groupBy 转换为上述 table。但是做不到。
我找到了自己。
关键是使用 case class
而不是 structType
case class Param(param1: String, param2:String)
case class Sample(key: String, param:Array[Param])
val df = hiveContext.table("sample_sub")
val SampleDF = df.select($"fk", $"param1", $"param2")
val SampleDFMap = SampleDF.rdd.groupBy(r => r.getAs[String]("fk"))
val SampleJoinRDD = SampleDFMap.map(
r => Sample(r._1.asInstanceOf[String], r._2.map (
row => Param(row(1).asInstanceOf[String],row(2).asInstanceOf[String])
).toArray
)
)
SampleJoinRDD.toDF.toJSON.collect
// Array({"key":"A","param":[{"param1":"A11","param2":"A12"},{"param1":"A21","param2":"A22"}]}, {"key":"B","param":[{"param1":"B11","param2":"B12"}]})