使用spark处理地图结构
processing map structure using spark
我有一个包含映射结构的文件,该文件需要 processed.I 在下面使用 code.I 得到了 RDD[ROW] 的中间结果。数据如下所示。
val conf=new SparkConf().setAppName("student-example").setMaster("local")
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
val studentdataframe = sqlcontext.read.parquet("C:\student_marks.parquet")
studentdataframe.take(4).foreach(println)
数据看起来像这样。
[("Name=aaa","sub=math",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(25,20,19),"annual" -> Array(90,95,97)),"2018-02-03")],
[("Name=bbb","sub=science",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(25,20,19)),"2018-02-03")],
[("Name=ccc","sub=math",Map("weekly" -> Array(20,21,18),"quaterly" -> Array(25,16,25)),"2018-02-03")],
[("Name=ddd","sub=math",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(21,19,15),"annual" -> Array(91,86,64)),"2018-02-03")]
数据在RDD[ROW]中format.Here我只想要年度的总和marks.I如果没有年度标记则希望跳过记录there.I想要这样的输出。
Name=aaa|sub=math|282
Name=ddd|sub=math|241
请帮帮我。
您可以使用 udf
函数实现您的要求,甚至不需要转换为 rdd
。
我使用您给定的示例数据作为形成测试 dataframe
的一种方式
val studentdataframe = Seq(
("Name=aaa","sub=math",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(25,20,19),"annual" -> Array(90,95,97)),"2018-02-03"),
("Name=bbb","sub=science",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(25,20,19)),"2018-02-03"),
("Name=ccc","sub=math",Map("weekly" -> Array(20,21,18),"quaterly" -> Array(25,16,25)),"2018-02-03"),
("Name=ddd","sub=math",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(21,19,15),"annual" -> Array(91,86,64)),"2018-02-03")
).toDF("name", "sub", "marks", "date")
这给了我
+--------+-----------+-----------------------------------------------------------------------------------------------------------------+----------+
|name |sub |marks |date |
+--------+-----------+-----------------------------------------------------------------------------------------------------------------+----------+
|Name=aaa|sub=math |Map(weekly -> WrappedArray(25, 24, 23), quaterly -> WrappedArray(25, 20, 19), annual -> WrappedArray(90, 95, 97))|2018-02-03|
|Name=bbb|sub=science|Map(weekly -> WrappedArray(25, 24, 23), quaterly -> WrappedArray(25, 20, 19)) |2018-02-03|
|Name=ccc|sub=math |Map(weekly -> WrappedArray(20, 21, 18), quaterly -> WrappedArray(25, 16, 25)) |2018-02-03|
|Name=ddd|sub=math |Map(weekly -> WrappedArray(25, 24, 23), quaterly -> WrappedArray(21, 19, 15), annual -> WrappedArray(91, 86, 64))|2018-02-03|
+--------+-----------+-----------------------------------------------------------------------------------------------------------------+----------+
正如我所说,一个简单的 udf
函数应该可以解决您的需求,因此 udf
函数可以如下所示
import org.apache.spark.sql.functions._
def sumAnnual = udf((annual: Map[String, collection.mutable.WrappedArray[Int]]) => if (annual.keySet.contains("annual")) annual("annual").sum else 0)
你可以像下面这样使用它
studentdataframe.select(col("name"), col("sub"), sumAnnual(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)
这会给你所需的 dataframe
作为
+--------+--------+---+
|name |sub |sum|
+--------+--------+---+
|Name=aaa|sub=math|282|
|Name=ddd|sub=math|241|
+--------+--------+---+
希望回答对你有帮助
我有一个包含映射结构的文件,该文件需要 processed.I 在下面使用 code.I 得到了 RDD[ROW] 的中间结果。数据如下所示。
val conf=new SparkConf().setAppName("student-example").setMaster("local")
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
val studentdataframe = sqlcontext.read.parquet("C:\student_marks.parquet")
studentdataframe.take(4).foreach(println)
数据看起来像这样。
[("Name=aaa","sub=math",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(25,20,19),"annual" -> Array(90,95,97)),"2018-02-03")],
[("Name=bbb","sub=science",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(25,20,19)),"2018-02-03")],
[("Name=ccc","sub=math",Map("weekly" -> Array(20,21,18),"quaterly" -> Array(25,16,25)),"2018-02-03")],
[("Name=ddd","sub=math",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(21,19,15),"annual" -> Array(91,86,64)),"2018-02-03")]
数据在RDD[ROW]中format.Here我只想要年度的总和marks.I如果没有年度标记则希望跳过记录there.I想要这样的输出。
Name=aaa|sub=math|282
Name=ddd|sub=math|241
请帮帮我。
您可以使用 udf
函数实现您的要求,甚至不需要转换为 rdd
。
我使用您给定的示例数据作为形成测试 dataframe
的一种方式
val studentdataframe = Seq(
("Name=aaa","sub=math",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(25,20,19),"annual" -> Array(90,95,97)),"2018-02-03"),
("Name=bbb","sub=science",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(25,20,19)),"2018-02-03"),
("Name=ccc","sub=math",Map("weekly" -> Array(20,21,18),"quaterly" -> Array(25,16,25)),"2018-02-03"),
("Name=ddd","sub=math",Map("weekly" -> Array(25,24,23),"quaterly" -> Array(21,19,15),"annual" -> Array(91,86,64)),"2018-02-03")
).toDF("name", "sub", "marks", "date")
这给了我
+--------+-----------+-----------------------------------------------------------------------------------------------------------------+----------+
|name |sub |marks |date |
+--------+-----------+-----------------------------------------------------------------------------------------------------------------+----------+
|Name=aaa|sub=math |Map(weekly -> WrappedArray(25, 24, 23), quaterly -> WrappedArray(25, 20, 19), annual -> WrappedArray(90, 95, 97))|2018-02-03|
|Name=bbb|sub=science|Map(weekly -> WrappedArray(25, 24, 23), quaterly -> WrappedArray(25, 20, 19)) |2018-02-03|
|Name=ccc|sub=math |Map(weekly -> WrappedArray(20, 21, 18), quaterly -> WrappedArray(25, 16, 25)) |2018-02-03|
|Name=ddd|sub=math |Map(weekly -> WrappedArray(25, 24, 23), quaterly -> WrappedArray(21, 19, 15), annual -> WrappedArray(91, 86, 64))|2018-02-03|
+--------+-----------+-----------------------------------------------------------------------------------------------------------------+----------+
正如我所说,一个简单的 udf
函数应该可以解决您的需求,因此 udf
函数可以如下所示
import org.apache.spark.sql.functions._
def sumAnnual = udf((annual: Map[String, collection.mutable.WrappedArray[Int]]) => if (annual.keySet.contains("annual")) annual("annual").sum else 0)
你可以像下面这样使用它
studentdataframe.select(col("name"), col("sub"), sumAnnual(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)
这会给你所需的 dataframe
作为
+--------+--------+---+
|name |sub |sum|
+--------+--------+---+
|Name=aaa|sub=math|282|
|Name=ddd|sub=math|241|
+--------+--------+---+
希望回答对你有帮助