对 RDD 中的键进行排序
Sorting keys in an RDD
我需要对 RDD 中的键进行排序,但没有自然排序顺序(不是升序或降序)。我什至不知道如何编写比较器来完成它。假设我有一张苹果、梨、橙子和葡萄的地图,我想按橙子、苹果、葡萄和梨排序。
关于如何在 Spark/Scala 中执行此操作的任何想法?谢谢!
我不知道 spark,但如果使用纯 Scala 集合,那将是
_.sortBy(_.fruitType)
例如,
val l: List[String] = List("the", "big", "bang")
val sortedByFirstLetter = l.sortBy(_.head)
// List(big, bang, the)
Spark 中有一个 sortBy
方法,允许您定义任意顺序以及是否要升序或降序。例如。
scala> val rdd = sc.parallelize(Seq ( ("a", 1), ("z", 7), ("p", 3), ("a", 13) ))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[331] at parallelize at <console>:70
scala> rdd.sortBy( _._2, ascending = false) .collect.mkString("\n")
res34: String =
(a,13)
(z,7)
(p,3)
(a,1)
scala> rdd.sortBy( _._1, ascending = false) .collect.mkString("\n")
res35: String =
(z,7)
(p,3)
(a,1)
(a,13)
scala> rdd.sortBy
def sortBy[K](f: T => K, ascending: Boolean, numPartitions: Int)(implicit ord: scala.math.Ordering[K], ctag: scala.reflect.ClassTag[K]): RDD[T]
最后一部分告诉你sortBy
的签名是什么。前面示例中使用的排序是按对的第一部分和第二部分进行的。
编辑:回答太快了,没有检查你的问题,抱歉......无论如何,你会像你的例子一样定义你的顺序:
def myord(fruit:String) = fruit match {
case "oranges" => 1 ;
case "apples" => 2;
case "grapes" =>3;
case "pears" => 4;
case _ => 5}
val rdd = sc.parallelize(Seq("apples", "oranges" , "pears", "grapes" , "other") )
那么,排序的结果就是:
scala> rdd.sortBy[Int](myord, ascending = true).collect.mkString("\n")
res1: String =
oranges
apples
grapes
pears
other
如果描述顺序的唯一方法是枚举,那么只需枚举:
val order = Map("orange" -> 0L, "apple" -> 1L, "grape" -> 2L, "pear" -> 3L)
val rdd = sc.parallelize(Seq(("grape", 0.3), ("apple", 5.0), ("orange", 5.6)))
val sorted = rdd.sortBy{case (key, _) => order.getOrElse(key, Long.MaxValue)}
sorted.collect
// Array[(String, Double)] = Array((orange,5.6), (apple,5.0), (grape,0.3))
在 Scala 中,您需要寻找 Ordering[T]
特征而不是 Comparator
接口——主要是表面上的差异,因此重点是数据的属性而不是事物它比较数据的两个实例。实现特征需要定义 compare(T,T)
方法。枚举比较的一个非常明确的版本可以是:
object fruitOrdering extends Ordering[String] {
def compare(lhs: String, rhs: String): Int = (lhs, rhs) match {
case ("orange", "orange") => 0
case ("orange", _) => -1
case ("apple", "orange") => 1
case ("apple", "apple") => 0
case ("apple", _) => -1
case ("grape", "orange") => 1
case ("grape", "apple") => 1
case ("grape", "grape") => 0
case ("grape", _) => -1
case ("pear", "orange") => 1
case ("pear", "apple") => 1
case ("pear", "grape") => 1
case ("pear", "pear") => 0
case ("pear", _) => -1
case _ => 0
}
}
或者,稍微适应一下:
object fruitOrdering2 extends Ordering[String] {
private val values = Seq("orange", "apple", "grape", "pear")
// generate the map based off of indices so we don't have to worry about human error during updates
private val ordinalMap = values.zipWithIndex.toMap.withDefaultValue(Int.MaxValue)
def compare(lhs: String, rhs: String): Int = ordinalMap(lhs).compare(ordinalMap(rhs))
}
现在您有了 Ordering[String]
的实例,您需要通知 sortBy
方法使用此顺序而不是内置顺序。如果您查看 RDD#sortBy
的签名,您会看到完整的签名是
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
第二个参数列表中隐含的 Ordering[K]
通常由编译器查找预定义的顺序——这就是它知道自然顺序应该是什么的方式。然而,任何隐式参数都可以被赋予显式值。请注意,如果您提供一个隐式值,那么您需要提供所有的值,因此在这种情况下,我们还需要提供 ClassTag[K]
。这总是由编译器生成,但可以使用 scala.reflect.classTag
.
轻松显式生成
指定所有这些,调用将如下所示:
import scala.reflect.classTag
rdd.sortBy { case (key, _) => key }(fruitOrdering, classOf[String])
不过还是很乱,不是吗?幸运的是,我们可以使用隐式 类 来消除很多麻烦。这是我经常使用的片段:
package com.example.spark
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
package object implicits {
implicit class RichSortingRDD[A : ClassTag](underlying: RDD[A]) {
def sorted(implicit ord: Ordering[A]): RDD[A] =
underlying.sortBy(identity)(ord, implicitly[ClassTag[A]])
def sortWith(fn: (A, A) => Int): RDD[A] = {
val ord = new Ordering[A] { def compare(lhs: A, rhs: A): Int = fn(lhs, rhs) }
sorted(ord)
}
}
implicit class RichSortingPairRDD[K : ClassTag, V](underlying: RDD[(K, V)]) {
def sortByKey(implicit ord: Ordering[K]): RDD[(K, V)] =
underlying.sortBy { case (key, _) => key } (ord, implicitly[ClassTag[K]])
def sortByKeyWith(fn: (K, K) => Int): RDD[(K, V)] = {
val ord = new Ordering[K] { def compare(lhs: K, rhs: K): Int = fn(lhs, rhs) }
sortByKey(ord)
}
}
}
并在行动中:
import com.example.spark.implicits._
val rdd = sc.parallelize(Seq(("grape", 0.3), ("apple", 5.0), ("orange", 5.6)))
rdd.sortByKey(fruitOrdering).collect
// Array[(String, Double)] = Array((orange,5.6), (apple,5.0), (grape,0.3))
rdd.sortByKey.collect // Natural ordering by default
// Array[(String, Double)] = Array((apple,5.0), (grape,0.3), (orange,5.6))
rdd.sortWith(_._2 compare _._2).collect // sort by the value instead
// Array[(String, Double)] = Array((grape,0.3), (apple,5.0), (orange,5.6))
我需要对 RDD 中的键进行排序,但没有自然排序顺序(不是升序或降序)。我什至不知道如何编写比较器来完成它。假设我有一张苹果、梨、橙子和葡萄的地图,我想按橙子、苹果、葡萄和梨排序。
关于如何在 Spark/Scala 中执行此操作的任何想法?谢谢!
我不知道 spark,但如果使用纯 Scala 集合,那将是
_.sortBy(_.fruitType)
例如,
val l: List[String] = List("the", "big", "bang")
val sortedByFirstLetter = l.sortBy(_.head)
// List(big, bang, the)
Spark 中有一个 sortBy
方法,允许您定义任意顺序以及是否要升序或降序。例如。
scala> val rdd = sc.parallelize(Seq ( ("a", 1), ("z", 7), ("p", 3), ("a", 13) ))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[331] at parallelize at <console>:70
scala> rdd.sortBy( _._2, ascending = false) .collect.mkString("\n")
res34: String =
(a,13)
(z,7)
(p,3)
(a,1)
scala> rdd.sortBy( _._1, ascending = false) .collect.mkString("\n")
res35: String =
(z,7)
(p,3)
(a,1)
(a,13)
scala> rdd.sortBy
def sortBy[K](f: T => K, ascending: Boolean, numPartitions: Int)(implicit ord: scala.math.Ordering[K], ctag: scala.reflect.ClassTag[K]): RDD[T]
最后一部分告诉你sortBy
的签名是什么。前面示例中使用的排序是按对的第一部分和第二部分进行的。
编辑:回答太快了,没有检查你的问题,抱歉......无论如何,你会像你的例子一样定义你的顺序:
def myord(fruit:String) = fruit match {
case "oranges" => 1 ;
case "apples" => 2;
case "grapes" =>3;
case "pears" => 4;
case _ => 5}
val rdd = sc.parallelize(Seq("apples", "oranges" , "pears", "grapes" , "other") )
那么,排序的结果就是:
scala> rdd.sortBy[Int](myord, ascending = true).collect.mkString("\n")
res1: String =
oranges
apples
grapes
pears
other
如果描述顺序的唯一方法是枚举,那么只需枚举:
val order = Map("orange" -> 0L, "apple" -> 1L, "grape" -> 2L, "pear" -> 3L)
val rdd = sc.parallelize(Seq(("grape", 0.3), ("apple", 5.0), ("orange", 5.6)))
val sorted = rdd.sortBy{case (key, _) => order.getOrElse(key, Long.MaxValue)}
sorted.collect
// Array[(String, Double)] = Array((orange,5.6), (apple,5.0), (grape,0.3))
在 Scala 中,您需要寻找 Ordering[T]
特征而不是 Comparator
接口——主要是表面上的差异,因此重点是数据的属性而不是事物它比较数据的两个实例。实现特征需要定义 compare(T,T)
方法。枚举比较的一个非常明确的版本可以是:
object fruitOrdering extends Ordering[String] {
def compare(lhs: String, rhs: String): Int = (lhs, rhs) match {
case ("orange", "orange") => 0
case ("orange", _) => -1
case ("apple", "orange") => 1
case ("apple", "apple") => 0
case ("apple", _) => -1
case ("grape", "orange") => 1
case ("grape", "apple") => 1
case ("grape", "grape") => 0
case ("grape", _) => -1
case ("pear", "orange") => 1
case ("pear", "apple") => 1
case ("pear", "grape") => 1
case ("pear", "pear") => 0
case ("pear", _) => -1
case _ => 0
}
}
或者,稍微适应一下
object fruitOrdering2 extends Ordering[String] {
private val values = Seq("orange", "apple", "grape", "pear")
// generate the map based off of indices so we don't have to worry about human error during updates
private val ordinalMap = values.zipWithIndex.toMap.withDefaultValue(Int.MaxValue)
def compare(lhs: String, rhs: String): Int = ordinalMap(lhs).compare(ordinalMap(rhs))
}
现在您有了 Ordering[String]
的实例,您需要通知 sortBy
方法使用此顺序而不是内置顺序。如果您查看 RDD#sortBy
的签名,您会看到完整的签名是
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
第二个参数列表中隐含的 Ordering[K]
通常由编译器查找预定义的顺序——这就是它知道自然顺序应该是什么的方式。然而,任何隐式参数都可以被赋予显式值。请注意,如果您提供一个隐式值,那么您需要提供所有的值,因此在这种情况下,我们还需要提供 ClassTag[K]
。这总是由编译器生成,但可以使用 scala.reflect.classTag
.
指定所有这些,调用将如下所示:
import scala.reflect.classTag
rdd.sortBy { case (key, _) => key }(fruitOrdering, classOf[String])
不过还是很乱,不是吗?幸运的是,我们可以使用隐式 类 来消除很多麻烦。这是我经常使用的片段:
package com.example.spark
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
package object implicits {
implicit class RichSortingRDD[A : ClassTag](underlying: RDD[A]) {
def sorted(implicit ord: Ordering[A]): RDD[A] =
underlying.sortBy(identity)(ord, implicitly[ClassTag[A]])
def sortWith(fn: (A, A) => Int): RDD[A] = {
val ord = new Ordering[A] { def compare(lhs: A, rhs: A): Int = fn(lhs, rhs) }
sorted(ord)
}
}
implicit class RichSortingPairRDD[K : ClassTag, V](underlying: RDD[(K, V)]) {
def sortByKey(implicit ord: Ordering[K]): RDD[(K, V)] =
underlying.sortBy { case (key, _) => key } (ord, implicitly[ClassTag[K]])
def sortByKeyWith(fn: (K, K) => Int): RDD[(K, V)] = {
val ord = new Ordering[K] { def compare(lhs: K, rhs: K): Int = fn(lhs, rhs) }
sortByKey(ord)
}
}
}
并在行动中:
import com.example.spark.implicits._
val rdd = sc.parallelize(Seq(("grape", 0.3), ("apple", 5.0), ("orange", 5.6)))
rdd.sortByKey(fruitOrdering).collect
// Array[(String, Double)] = Array((orange,5.6), (apple,5.0), (grape,0.3))
rdd.sortByKey.collect // Natural ordering by default
// Array[(String, Double)] = Array((apple,5.0), (grape,0.3), (orange,5.6))
rdd.sortWith(_._2 compare _._2).collect // sort by the value instead
// Array[(String, Double)] = Array((grape,0.3), (apple,5.0), (orange,5.6))