根据apache spark中的最大值从键值对返回键
Returning keys from key-value pairs based on maximum value in apache spark
我是 apache spark 的新手,需要一些建议。我有一个 [String, Int] 类型的 RDD。 RDD 值是这样的:
- ("A,x",3)
- ("A,y",4)
- ("A,z",1)
- ("B,y",2)
- ("C,w",5)
- ("C,y",2)
- ("E,x",1)
- ("E,z",3)
我想要完成的是获得这样的 RDD (String,String):
- ("A","y") //在包含A的key中,(A,y)有最大值
- ("B","y") //包含B的key中,(B,y)有最大值
- ("C","w") //包含C的key中,(C,w)有最大值
- ("E","z") //在包含E的key中,(E,z)有最大值
我在 flatMap 中尝试了循环概念(通过使用计数器),但它不起作用。有没有简单的方法来做到这一点?
只需重塑 reduceByKey
:
val pattern = "^(.*?),(.*?)$".r
rdd
// Split key into parts
.flatMap{ case (pattern(x, y), z) => Some((x, (y, z))) }
// Reduce by first part of the key
.reduceByKey( (a, b) => if (a._2 > b._2) a else b )
// Go back to the original shape
.map { case (x, (y, z)) => (s"$x,$y", z) }
您可以使用 groupBy 键,然后使用 maxBy 函数来获取您的输出
val data = Array(("A,x", 3),("A,y", 4),("A,z", 1),("B,y", 2),("C,w", 5),("C,y", 2),("E,x", 1),("E,z", 3))
val rdd = sc.makeRDD(data).map(i => { // Paralleling the sample data
val t = i._1.split(",") // Splitting the String by ,
t(0) ->(t(1), i._2) // Transforming String,Int to String,(String,Int)
}).groupByKey().map(i => { // Performing a groupBy key
(i._1, i._2.maxBy(_._2)._1) // returning the Max value by the Int being passed using the maxBy function
})
rdd.foreach(println(_)) // Printing the output
我是 apache spark 的新手,需要一些建议。我有一个 [String, Int] 类型的 RDD。 RDD 值是这样的:
- ("A,x",3)
- ("A,y",4)
- ("A,z",1)
- ("B,y",2)
- ("C,w",5)
- ("C,y",2)
- ("E,x",1)
- ("E,z",3)
我想要完成的是获得这样的 RDD (String,String):
- ("A","y") //在包含A的key中,(A,y)有最大值
- ("B","y") //包含B的key中,(B,y)有最大值
- ("C","w") //包含C的key中,(C,w)有最大值
- ("E","z") //在包含E的key中,(E,z)有最大值
我在 flatMap 中尝试了循环概念(通过使用计数器),但它不起作用。有没有简单的方法来做到这一点?
只需重塑 reduceByKey
:
val pattern = "^(.*?),(.*?)$".r
rdd
// Split key into parts
.flatMap{ case (pattern(x, y), z) => Some((x, (y, z))) }
// Reduce by first part of the key
.reduceByKey( (a, b) => if (a._2 > b._2) a else b )
// Go back to the original shape
.map { case (x, (y, z)) => (s"$x,$y", z) }
您可以使用 groupBy 键,然后使用 maxBy 函数来获取您的输出
val data = Array(("A,x", 3),("A,y", 4),("A,z", 1),("B,y", 2),("C,w", 5),("C,y", 2),("E,x", 1),("E,z", 3))
val rdd = sc.makeRDD(data).map(i => { // Paralleling the sample data
val t = i._1.split(",") // Splitting the String by ,
t(0) ->(t(1), i._2) // Transforming String,Int to String,(String,Int)
}).groupByKey().map(i => { // Performing a groupBy key
(i._1, i._2.maxBy(_._2)._1) // returning the Max value by the Int being passed using the maxBy function
})
rdd.foreach(println(_)) // Printing the output