如何对 Spark 中的 RDD 和限制进行排序?
How to sort an RDD and limit in Spark?
我有 Foo class 的 RDD:class Foo( name : String, createDate : Date )
。
我想要一个旧 10% Foo
的另一个 RDD。
我的第一个想法是按createDate
排序,按0.1*count限制,但是没有限制功能
你有想法吗?
假设 Foo
是这样的情况 class:
import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
使用纯 RDD:
import org.apache.spark.rdd.RDD
import scala.math.Ordering
val rdd: RDD[Foo] = sc
.parallelize(Seq(
("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"),
("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23")))
.toDF("name", "createDate")
.withColumn("createDate", $"createDate".cast("date"))
.as[Foo].rdd
rdd.cache()
val n = scala.math.ceil(0.1 * rdd.count).toInt
数据适合驱动程序内存:
而且你要的分数比较小
rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime))
// Array[Foo] = Array(Foo(a,2009-11-23))
你要的分数比较大:
rdd.sortBy(_.createDate.getTime).take(n)
否则
rdd
.sortBy(_.createDate.getTime)
.zipWithIndex
.filter{case (_, idx) => idx < n}
.keys
使用 DataFrame(注意 - 由于限制行为,这实际上不是最佳性能)。
import org.apache.spark.sql.Row
val topN = rdd.toDF.orderBy($"createDate").limit(n)
topN.show
// +----+----------+
// |name|createDate|
// +----+----------+
// | a|2009-11-23|
// +----+----------+
// Optionally recreate RDD[Foo]
topN.map{case Row(name: String, date: Date) => Foo(name, date)}
我有 Foo class 的 RDD:class Foo( name : String, createDate : Date )
。
我想要一个旧 10% Foo
的另一个 RDD。
我的第一个想法是按createDate
排序,按0.1*count限制,但是没有限制功能
你有想法吗?
假设 Foo
是这样的情况 class:
import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
使用纯 RDD:
import org.apache.spark.rdd.RDD import scala.math.Ordering val rdd: RDD[Foo] = sc .parallelize(Seq( ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"), ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23"))) .toDF("name", "createDate") .withColumn("createDate", $"createDate".cast("date")) .as[Foo].rdd rdd.cache() val n = scala.math.ceil(0.1 * rdd.count).toInt
数据适合驱动程序内存:
而且你要的分数比较小
rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime)) // Array[Foo] = Array(Foo(a,2009-11-23))
你要的分数比较大:
rdd.sortBy(_.createDate.getTime).take(n)
否则
rdd .sortBy(_.createDate.getTime) .zipWithIndex .filter{case (_, idx) => idx < n} .keys
使用 DataFrame(注意 - 由于限制行为,这实际上不是最佳性能)。
import org.apache.spark.sql.Row val topN = rdd.toDF.orderBy($"createDate").limit(n) topN.show // +----+----------+ // |name|createDate| // +----+----------+ // | a|2009-11-23| // +----+----------+ // Optionally recreate RDD[Foo] topN.map{case Row(name: String, date: Date) => Foo(name, date)}