有效地为 spark 实现 takeByKey
efficiently implementing takeByKey for spark
我有一个 RDD[(k:Int,v:String)]
类型的 RDD。我想为每个键 k
使用最多 1000 个元组,这样我就有了 [(k,v)]
,其中没有键出现超过 1000 次。 有没有一种方法可以避免先调用 groupBy 的性能损失?我想不出一种聚合值的好方法,以避免导致我的工作失败的完整 groupBy。
天真的方法:
def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.groupBy(_._1).mapValues(_.take(n)).flatMap(_._2)
}
我正在寻找一种更有效的方法来避免 groupBy:
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
//use reduceByKey, foldByKey, etc..??
}
这是迄今为止我开发的最好的解决方案,但它没有类型检查..
def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.foldByKey(List[V](), ((acc, elem) => if (acc.length >= n) acc else elem._2 :: acc)).flatMap(t => t._2.map(v => (t._1, v)))
}
编辑。
我提出了一个似乎有效的稍微好一点的解决方案:
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.mapValues(List(_))
.reduceByKey((x,y) => if(x.length >= n) x
else if(y.length >= n) y
else (x ++ y).take(n))
.flatMap(t => t._2.map(v => (t._1, v)))
}
这是迄今为止我想出的最佳解决方案
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.mapValues(List(_))
.reduceByKey((x,y) => if(x.length >= n) x
else if(y.length >= n) y
else (x ++ y).take(n))
.flatMap(t => t._2.map(v => (t._1, v)))
}
它不会像 groupByKey 方法那样 运行 内存不足而死掉,但它仍然很慢。
您当前的解决方案是朝着正确方向迈出的一步,但至少出于以下三个原因,它仍然效率很低:
mapValues(List(_))
创建大量临时 List
对象
length
对于线性 Seq
就像 List
是 O(N)
x ++ y
再次创建大量临时对象
您可以包含的最简单的方法是将 List
替换为具有恒定时间 length
的可变缓冲区。一个可能的例子是这样的:
import scala.collection.mutable.ArrayBuffer
rdd.aggregateByKey(ArrayBuffer[Int]())(
(acc, x) => if (acc.length >= n) acc else acc += x,
(acc1, acc2) => {
val (xs, ys) = if (acc1.length > acc2.length) (acc1, acc2) else (acc2, acc1)
val toTake = Math.min(n - xs.length, ys.length)
for (i <- 0 until toTake) {
xs += ys(i)
}
xs
}
)
在旁注中,您可以替换:
.flatMap(t => t._2.map(v => (t._1, v)))
和
.flatMapValues(x => x) // identity[Seq[V]]
它不会影响性能,但它稍微干净一些。
我有一个 RDD[(k:Int,v:String)]
类型的 RDD。我想为每个键 k
使用最多 1000 个元组,这样我就有了 [(k,v)]
,其中没有键出现超过 1000 次。 有没有一种方法可以避免先调用 groupBy 的性能损失?我想不出一种聚合值的好方法,以避免导致我的工作失败的完整 groupBy。
天真的方法:
def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.groupBy(_._1).mapValues(_.take(n)).flatMap(_._2)
}
我正在寻找一种更有效的方法来避免 groupBy:
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
//use reduceByKey, foldByKey, etc..??
}
这是迄今为止我开发的最好的解决方案,但它没有类型检查..
def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.foldByKey(List[V](), ((acc, elem) => if (acc.length >= n) acc else elem._2 :: acc)).flatMap(t => t._2.map(v => (t._1, v)))
}
编辑。 我提出了一个似乎有效的稍微好一点的解决方案:
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.mapValues(List(_))
.reduceByKey((x,y) => if(x.length >= n) x
else if(y.length >= n) y
else (x ++ y).take(n))
.flatMap(t => t._2.map(v => (t._1, v)))
}
这是迄今为止我想出的最佳解决方案
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.mapValues(List(_))
.reduceByKey((x,y) => if(x.length >= n) x
else if(y.length >= n) y
else (x ++ y).take(n))
.flatMap(t => t._2.map(v => (t._1, v)))
}
它不会像 groupByKey 方法那样 运行 内存不足而死掉,但它仍然很慢。
您当前的解决方案是朝着正确方向迈出的一步,但至少出于以下三个原因,它仍然效率很低:
mapValues(List(_))
创建大量临时List
对象length
对于线性Seq
就像List
是 O(N)x ++ y
再次创建大量临时对象
您可以包含的最简单的方法是将 List
替换为具有恒定时间 length
的可变缓冲区。一个可能的例子是这样的:
import scala.collection.mutable.ArrayBuffer
rdd.aggregateByKey(ArrayBuffer[Int]())(
(acc, x) => if (acc.length >= n) acc else acc += x,
(acc1, acc2) => {
val (xs, ys) = if (acc1.length > acc2.length) (acc1, acc2) else (acc2, acc1)
val toTake = Math.min(n - xs.length, ys.length)
for (i <- 0 until toTake) {
xs += ys(i)
}
xs
}
)
在旁注中,您可以替换:
.flatMap(t => t._2.map(v => (t._1, v)))
和
.flatMapValues(x => x) // identity[Seq[V]]
它不会影响性能,但它稍微干净一些。