在火花中中止地图执行

Abort map execution in spark

如何在 spark 中中断地图作业:

val rddFiltered = rdd.map(value => value.toInt).map(value => {

  if (value == 0)
  // if the condition is true , 
  // stop the map execution and return the processed RDD
  value
})

假设我的 rdd 是:3,4,7,1,3,0,4,6

我要3,4,7,1,3

这不是您可以使用 Spark 有效解决的问题。无法保证处理分区的顺序,并且不存在部分转换之类的东西。

如果您希望预期的记录数量足够小以供驱动程序处理,您可以通过迭代 runJob 实现与 take 类似的方法并收集分区,只要您找不到第一个包含满足谓词的值的值。

或者,以完整数据扫描为代价:

def takeWhile[T : ClassTag](rdd: RDD[T])(pred: T => Boolean) = {
  /* Determine partition where the predicate is not satisfied
     for the first time */
  val i = rdd.mapPartitionsWithIndex((i, iter) => 
    if (iter.exists(!pred(_))) Iterator(i) else Iterator()
  ).min

  /* Process partitions dropping elements after the first one
     which doesn't satisfy the predicate */
  rdd.mapPartitionsWithIndex { 
    case (j, iter) if j < i => iter  // before i-th take all elements

    // i-th drop after the element of interest
    case (j, iter) if j == i => iter.takeWhile(pred) 

    case _ =>  Iterator() // after i-th drop all
  }
}


val rdd = sc.parallelize(Seq(3, 4, 7, 1, 3, 0, 4, 6))

takeWhile(rdd)(_ != 0).collect
// Array[Int] = Array(3, 4, 7, 1, 3)

takeWhile(rdd)(_ != 3).collect
// Array[Int] = Array()

takeWhile(rdd)(_ >= 2).collect
// Array[Int] = Array(3, 4, 7) 

请注意,这不会修改分区数,因此如果不重新分区,可能会导致资源利用率不理想。

归根结底,将这种类型的顺序逻辑应用于 Spark 几乎没有意义。也不是说它不修改分区数所以