在火花中中止地图执行
Abort map execution in spark
如何在 spark 中中断地图作业:
val rddFiltered = rdd.map(value => value.toInt).map(value => {
if (value == 0)
// if the condition is true ,
// stop the map execution and return the processed RDD
value
})
假设我的 rdd 是:3,4,7,1,3,0,4,6
我要3,4,7,1,3
这不是您可以使用 Spark 有效解决的问题。无法保证处理分区的顺序,并且不存在部分转换之类的东西。
如果您希望预期的记录数量足够小以供驱动程序处理,您可以通过迭代 runJob
实现与 take
类似的方法并收集分区,只要您找不到第一个包含满足谓词的值的值。
或者,以完整数据扫描为代价:
def takeWhile[T : ClassTag](rdd: RDD[T])(pred: T => Boolean) = {
/* Determine partition where the predicate is not satisfied
for the first time */
val i = rdd.mapPartitionsWithIndex((i, iter) =>
if (iter.exists(!pred(_))) Iterator(i) else Iterator()
).min
/* Process partitions dropping elements after the first one
which doesn't satisfy the predicate */
rdd.mapPartitionsWithIndex {
case (j, iter) if j < i => iter // before i-th take all elements
// i-th drop after the element of interest
case (j, iter) if j == i => iter.takeWhile(pred)
case _ => Iterator() // after i-th drop all
}
}
val rdd = sc.parallelize(Seq(3, 4, 7, 1, 3, 0, 4, 6))
takeWhile(rdd)(_ != 0).collect
// Array[Int] = Array(3, 4, 7, 1, 3)
takeWhile(rdd)(_ != 3).collect
// Array[Int] = Array()
takeWhile(rdd)(_ >= 2).collect
// Array[Int] = Array(3, 4, 7)
请注意,这不会修改分区数,因此如果不重新分区,可能会导致资源利用率不理想。
归根结底,将这种类型的顺序逻辑应用于 Spark 几乎没有意义。也不是说它不修改分区数所以
如何在 spark 中中断地图作业:
val rddFiltered = rdd.map(value => value.toInt).map(value => {
if (value == 0)
// if the condition is true ,
// stop the map execution and return the processed RDD
value
})
假设我的 rdd 是:3,4,7,1,3,0,4,6
我要3,4,7,1,3
这不是您可以使用 Spark 有效解决的问题。无法保证处理分区的顺序,并且不存在部分转换之类的东西。
如果您希望预期的记录数量足够小以供驱动程序处理,您可以通过迭代 runJob
实现与 take
类似的方法并收集分区,只要您找不到第一个包含满足谓词的值的值。
或者,以完整数据扫描为代价:
def takeWhile[T : ClassTag](rdd: RDD[T])(pred: T => Boolean) = {
/* Determine partition where the predicate is not satisfied
for the first time */
val i = rdd.mapPartitionsWithIndex((i, iter) =>
if (iter.exists(!pred(_))) Iterator(i) else Iterator()
).min
/* Process partitions dropping elements after the first one
which doesn't satisfy the predicate */
rdd.mapPartitionsWithIndex {
case (j, iter) if j < i => iter // before i-th take all elements
// i-th drop after the element of interest
case (j, iter) if j == i => iter.takeWhile(pred)
case _ => Iterator() // after i-th drop all
}
}
val rdd = sc.parallelize(Seq(3, 4, 7, 1, 3, 0, 4, 6))
takeWhile(rdd)(_ != 0).collect
// Array[Int] = Array(3, 4, 7, 1, 3)
takeWhile(rdd)(_ != 3).collect
// Array[Int] = Array()
takeWhile(rdd)(_ >= 2).collect
// Array[Int] = Array(3, 4, 7)
请注意,这不会修改分区数,因此如果不重新分区,可能会导致资源利用率不理想。
归根结底,将这种类型的顺序逻辑应用于 Spark 几乎没有意义。也不是说它不修改分区数所以