在 Apache Spark 中,如何使 RDD/DataFrame 操作惰性化?
In Apache Spark, how to make an RDD/DataFrame operation lazy?
假设我想编写一个函数 foo 来转换 DataFrame:
object Foo {
def foo(source: DataFrame): DataFrame = {
...complex iterative algorithm with a stopping condition...
}
}
由于foo的实现有很多"Actions"(collect、reduce等),调用foo会立即触发昂贵的执行。
这不是什么大问题,但是由于 foo 只是将一个 DataFrame 转换为另一个 DataFrame,按照惯例,允许惰性执行应该更好:只有当结果 DataFrame 或其衍生物(s ) 正在驱动程序上使用(通过另一个 "Action")。
到目前为止,唯一可靠地实现这一点的方法是将所有实现写入一个SparkPlan,并将其叠加到DataFrame的SparkExecution中,这非常容易出错并且涉及大量样板代码。推荐的方法是什么?
我不太清楚你试图实现什么,但 Scala 本身至少提供了一些你可能会觉得有用的工具:
惰性值:
val rdd = sc.range(0, 10000)
lazy val count = rdd.count // Nothing is executed here
// count: Long = <lazy>
count // count is evaluated only when it is actually used
// Long = 10000
按名称调用(在函数定义中用 =>
表示):
def foo(first: => Long, second: => Long, takeFirst: Boolean): Long =
if (takeFirst) first else second
val rdd1 = sc.range(0, 10000)
val rdd2 = sc.range(0, 10000)
foo(
{ println("first"); rdd1.count },
{ println("second"); rdd2.count },
true // Only first will be evaluated
)
// first
// Long = 10000
注意:在实践中,您应该创建本地惰性绑定以确保不会在每次访问时评估参数。
无限惰性集合,如 Stream
import org.apache.spark.mllib.random.RandomRDDs._
val initial = normalRDD(sc, 1000000L, 10)
// Infinite stream of RDDs and actions and nothing blows :)
val stream: Stream[RDD[Double]] = Stream(initial).append(
stream.map {
case rdd if !rdd.isEmpty =>
val mu = rdd.mean
rdd.filter(_ > mu)
case _ => sc.emptyRDD[Double]
}
)
其中的一些子集应该足以实现复杂的惰性计算。
假设我想编写一个函数 foo 来转换 DataFrame:
object Foo {
def foo(source: DataFrame): DataFrame = {
...complex iterative algorithm with a stopping condition...
}
}
由于foo的实现有很多"Actions"(collect、reduce等),调用foo会立即触发昂贵的执行。
这不是什么大问题,但是由于 foo 只是将一个 DataFrame 转换为另一个 DataFrame,按照惯例,允许惰性执行应该更好:只有当结果 DataFrame 或其衍生物(s ) 正在驱动程序上使用(通过另一个 "Action")。
到目前为止,唯一可靠地实现这一点的方法是将所有实现写入一个SparkPlan,并将其叠加到DataFrame的SparkExecution中,这非常容易出错并且涉及大量样板代码。推荐的方法是什么?
我不太清楚你试图实现什么,但 Scala 本身至少提供了一些你可能会觉得有用的工具:
惰性值:
val rdd = sc.range(0, 10000) lazy val count = rdd.count // Nothing is executed here // count: Long = <lazy> count // count is evaluated only when it is actually used // Long = 10000
按名称调用(在函数定义中用
=>
表示):def foo(first: => Long, second: => Long, takeFirst: Boolean): Long = if (takeFirst) first else second val rdd1 = sc.range(0, 10000) val rdd2 = sc.range(0, 10000) foo( { println("first"); rdd1.count }, { println("second"); rdd2.count }, true // Only first will be evaluated ) // first // Long = 10000
注意:在实践中,您应该创建本地惰性绑定以确保不会在每次访问时评估参数。
无限惰性集合,如
Stream
import org.apache.spark.mllib.random.RandomRDDs._ val initial = normalRDD(sc, 1000000L, 10) // Infinite stream of RDDs and actions and nothing blows :) val stream: Stream[RDD[Double]] = Stream(initial).append( stream.map { case rdd if !rdd.isEmpty => val mu = rdd.mean rdd.filter(_ > mu) case _ => sc.emptyRDD[Double] } )
其中的一些子集应该足以实现复杂的惰性计算。