基于列子集的火花数据帧排序
spark dataframe sorting based on subset of a column
我正在通过另一个进程生成配对的 rdd/df,但这里是生成数据集以帮助调试过程的代码。
这是示例 i/p 文件 (/scratch/test2.txt):
1 本书 1 作者 1 1.10
2 本书 2 作者 2 2.20
1 本书 3 作者 2 3.30
这是生成数据框的代码
case class RefText (index: Int, description: String, fName: String, weight: Double)
val annotation_split = sc.textFile("/scratch/test2.txt").map(_.split("\t"))
val annotation = annotation_split.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated = annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")
annotate_concated.show()
+-----+-----------------+
|index| annotation|
+-----+-----------------+
| 1|book1#author1#1.1|
| 2|book2#author2#2.2|
| 1|book3#author2#3.3|
+-----+-----------------+
//Here is how I generate pairedrdd.
val paired_rdd : PairRDDFunctions[String, String] = annotate_concated.rdd.map(row => (row.getString(0), row.getString(1)))
val df = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")
这是我的数据框的样本数据,列描述的格式如下(text1#text2#weight | text1#text2#weight|....)
user1
book1#author1#0.07841217886795074|tool1#desc1#1.27044260397331488|song1#album1#-2.052661673730870676|item1#category1#-0.005683148395350108
user2
book2#author1#4.07841217886795074|tool2#desc1#-1.27044260397331488|song2#album1#2.052661673730870676|item2#category1#-0.005683148395350108
我想根据重量降序对描述列进行排序。
想要的o/p是:
user1
tool1#desc1#1.27044260397331488|book1#author1#0.07841217886795074|item1#category1#-0.005683148395350108|song1#album1#-2.052661673730870676
user2
book2#author1#4.07841217886795074|song2#album1#2.052661673730870676|tool2#desc1#-1.27044260397331488|item2#category1#-0.005683148395350108
如有任何帮助,我们将不胜感激。
我认为没有一种直接的方法可以对单元格内的值重新排序。我会亲自事先进行订购,即在 annotation_split
rdd 上。
这是一个示例(我不得不稍微更改代码以使其工作)。
HDFS 上的文件(使用常规空格和 @ 作为分隔符):
1 book1 author1 1.10 @ 2 book2 author2 2.20 @ 1 book3 author2 3.30
然后:
case class RefText (index: Int, description: String, fName: String, weight: Double)
// split by line, then split line into columns
val annotation_split = sc.textFile(path).flatMap(_.split(" @ ")).map{_.split(" ")}
// HERE IS THE TRICK: sort the lines in descending order
val annotation_sorted = annotation_split
.map(line => (line.last.toFloat,line))
.sortByKey(false)
.map(_._2)
// back to your code
val annotation = annotation_sorted.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated = annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")
// note: here, I replaced row.getString(0) by row.getInt(0) to avoid cast exception
val paired_rdd = annotate_concated.rdd.map(row => (row.getInt(0), row.getString(1)))
val df = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")
唯一的问题是,考虑到您的并行度,顺序可能会在之后混淆。另一种方法是映射每一列并以排序的方式重写它(拆分、排序、连接)。
我正在通过另一个进程生成配对的 rdd/df,但这里是生成数据集以帮助调试过程的代码。
这是示例 i/p 文件 (/scratch/test2.txt): 1 本书 1 作者 1 1.10 2 本书 2 作者 2 2.20 1 本书 3 作者 2 3.30
这是生成数据框的代码
case class RefText (index: Int, description: String, fName: String, weight: Double)
val annotation_split = sc.textFile("/scratch/test2.txt").map(_.split("\t"))
val annotation = annotation_split.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated = annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")
annotate_concated.show()
+-----+-----------------+
|index| annotation|
+-----+-----------------+
| 1|book1#author1#1.1|
| 2|book2#author2#2.2|
| 1|book3#author2#3.3|
+-----+-----------------+
//Here is how I generate pairedrdd.
val paired_rdd : PairRDDFunctions[String, String] = annotate_concated.rdd.map(row => (row.getString(0), row.getString(1)))
val df = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")
这是我的数据框的样本数据,列描述的格式如下(text1#text2#weight | text1#text2#weight|....)
user1 book1#author1#0.07841217886795074|tool1#desc1#1.27044260397331488|song1#album1#-2.052661673730870676|item1#category1#-0.005683148395350108
user2 book2#author1#4.07841217886795074|tool2#desc1#-1.27044260397331488|song2#album1#2.052661673730870676|item2#category1#-0.005683148395350108
我想根据重量降序对描述列进行排序。
想要的o/p是:
user1 tool1#desc1#1.27044260397331488|book1#author1#0.07841217886795074|item1#category1#-0.005683148395350108|song1#album1#-2.052661673730870676
user2 book2#author1#4.07841217886795074|song2#album1#2.052661673730870676|tool2#desc1#-1.27044260397331488|item2#category1#-0.005683148395350108
如有任何帮助,我们将不胜感激。
我认为没有一种直接的方法可以对单元格内的值重新排序。我会亲自事先进行订购,即在 annotation_split
rdd 上。
这是一个示例(我不得不稍微更改代码以使其工作)。 HDFS 上的文件(使用常规空格和 @ 作为分隔符):
1 book1 author1 1.10 @ 2 book2 author2 2.20 @ 1 book3 author2 3.30
然后:
case class RefText (index: Int, description: String, fName: String, weight: Double)
// split by line, then split line into columns
val annotation_split = sc.textFile(path).flatMap(_.split(" @ ")).map{_.split(" ")}
// HERE IS THE TRICK: sort the lines in descending order
val annotation_sorted = annotation_split
.map(line => (line.last.toFloat,line))
.sortByKey(false)
.map(_._2)
// back to your code
val annotation = annotation_sorted.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated = annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")
// note: here, I replaced row.getString(0) by row.getInt(0) to avoid cast exception
val paired_rdd = annotate_concated.rdd.map(row => (row.getInt(0), row.getString(1)))
val df = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")
唯一的问题是,考虑到您的并行度,顺序可能会在之后混淆。另一种方法是映射每一列并以排序的方式重写它(拆分、排序、连接)。