Spark - 在迭代(或递归)函数调用的情况下如何处理惰性评估
Spark - how to handle with lazy evaluation in case of iterative (or recursive) function calls
我有一个递归函数,需要将当前调用的结果与上一次调用的结果进行比较,以确定是否已经达到收敛。我的函数不包含任何 action
- 它只包含 map
、flatMap
和 reduceByKey
。由于 Spark 不评估转换(直到调用一个动作),我的下一次迭代没有获得正确的值来比较收敛。
这是函数的骨架 -
def func1(sc: SparkContext, nodes:RDD[List[Long]], didConverge: Boolean, changeCount: Int) RDD[(Long] = {
if (didConverge)
nodes
else {
val currChangeCount = sc.accumulator(0, "xyz")
val newNodes = performSomeOps(nodes, currChangeCount) // does a few map/flatMap/reduceByKey operations
if (currChangeCount.value == changeCount) {
func1(sc, newNodes, true, currChangeCount.value)
} else {
func1(sc, newNode, false, currChangeCount.value)
}
}
}
performSomeOps
仅包含 map
、flatMap
和 reduceByKey
转换。由于它没有任何动作,因此 performSomeOps
中的代码不会执行。所以我的 currChangeCount
没有得到实际计数。这意味着,检查收敛的条件 (currChangeCount.value == changeCount
) 将无效。克服的一种方法是通过调用 count
在每次迭代中强制执行操作,但这是不必要的开销。
我想知道我可以做些什么来强制执行一项操作 w/o 很多开销,或者是否有其他方法可以解决这个问题?
我看到这些 map/flatMap/reduceByKey
转换正在更新累加器。因此,执行所有更新的唯一方法是执行所有这些功能,而 count
是实现此目的的最简单方法,并且与其他方法相比开销最低 (cache
+ count
, first
或 collect
).
我相信这里有一个非常important thing你想念的:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
因为累加器不能可靠地用于管理控制流,更适合作业监控。
此外,执行一个动作并不是不必要的开销。如果你想知道你必须执行它的计算结果是什么。当然,除非结果微不足道。最便宜的行动可能是:
rdd.foreach { case _ => }
但它不会解决您在这里遇到的问题。
一般来说,Spark 中的迭代计算结构如下:
def func1(chcekpoinInterval: Int)(sc: SparkContext, nodes:RDD[List[Long]],
didConverge: Boolean, changeCount: Int, iteration: Int) RDD[(Long] = {
if (didConverge) nodes
else {
// Compute and cache new nodes
val newNodes = performSomeOps(nodes, currChangeCount).cache
// Periodically checkpoint to avoid stack overflow
if (iteration % checkpointInterval == 0) newNodes.checkpoint
/* Call a function which computes values
that determines control flow. This execute an action on newNodes.
*/
val changeCount = computeChangeCount(newNodes)
// Unpersist old nodes
nodes.unpersist
func1(checkpointInterval)(
sc, newNodes, currChangeCount.value == changeCount,
currChangeCount.value, iteration + 1
)
}
}
之前的答案让我走上了解决类似收敛检测问题的正确轨道。
foreach
在 the docs 中显示为:
foreach(func)
: Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
似乎而不是使用 rdd.foreach()
作为廉价操作来触发放置在各种转换中的累加器增量,它应该用来做递增本身。
我无法生成 scala 示例,但这里有一个基本的 java 版本,如果它仍然有用的话:
// Convergence is reached when two iterations
// return the same number of results
long previousCount = -1;
long currentCount = 0;
while (previousCount != currentCount){
rdd = doSomethingThatUpdatesRdd(rdd);
// Count entries in new rdd with foreach + accumulator
rdd.foreach(tuple -> accumulator.add(1));
// Update helper values
previousCount = currentCount;
currentCount = accumulator.sum();
accumulator.reset();
}
// Convergence is reached
我有一个递归函数,需要将当前调用的结果与上一次调用的结果进行比较,以确定是否已经达到收敛。我的函数不包含任何 action
- 它只包含 map
、flatMap
和 reduceByKey
。由于 Spark 不评估转换(直到调用一个动作),我的下一次迭代没有获得正确的值来比较收敛。
这是函数的骨架 -
def func1(sc: SparkContext, nodes:RDD[List[Long]], didConverge: Boolean, changeCount: Int) RDD[(Long] = {
if (didConverge)
nodes
else {
val currChangeCount = sc.accumulator(0, "xyz")
val newNodes = performSomeOps(nodes, currChangeCount) // does a few map/flatMap/reduceByKey operations
if (currChangeCount.value == changeCount) {
func1(sc, newNodes, true, currChangeCount.value)
} else {
func1(sc, newNode, false, currChangeCount.value)
}
}
}
performSomeOps
仅包含 map
、flatMap
和 reduceByKey
转换。由于它没有任何动作,因此 performSomeOps
中的代码不会执行。所以我的 currChangeCount
没有得到实际计数。这意味着,检查收敛的条件 (currChangeCount.value == changeCount
) 将无效。克服的一种方法是通过调用 count
在每次迭代中强制执行操作,但这是不必要的开销。
我想知道我可以做些什么来强制执行一项操作 w/o 很多开销,或者是否有其他方法可以解决这个问题?
我看到这些 map/flatMap/reduceByKey
转换正在更新累加器。因此,执行所有更新的唯一方法是执行所有这些功能,而 count
是实现此目的的最简单方法,并且与其他方法相比开销最低 (cache
+ count
, first
或 collect
).
我相信这里有一个非常important thing你想念的:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
因为累加器不能可靠地用于管理控制流,更适合作业监控。
此外,执行一个动作并不是不必要的开销。如果你想知道你必须执行它的计算结果是什么。当然,除非结果微不足道。最便宜的行动可能是:
rdd.foreach { case _ => }
但它不会解决您在这里遇到的问题。
一般来说,Spark 中的迭代计算结构如下:
def func1(chcekpoinInterval: Int)(sc: SparkContext, nodes:RDD[List[Long]],
didConverge: Boolean, changeCount: Int, iteration: Int) RDD[(Long] = {
if (didConverge) nodes
else {
// Compute and cache new nodes
val newNodes = performSomeOps(nodes, currChangeCount).cache
// Periodically checkpoint to avoid stack overflow
if (iteration % checkpointInterval == 0) newNodes.checkpoint
/* Call a function which computes values
that determines control flow. This execute an action on newNodes.
*/
val changeCount = computeChangeCount(newNodes)
// Unpersist old nodes
nodes.unpersist
func1(checkpointInterval)(
sc, newNodes, currChangeCount.value == changeCount,
currChangeCount.value, iteration + 1
)
}
}
之前的答案让我走上了解决类似收敛检测问题的正确轨道。
foreach
在 the docs 中显示为:
foreach(func)
: Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
似乎而不是使用 rdd.foreach()
作为廉价操作来触发放置在各种转换中的累加器增量,它应该用来做递增本身。
我无法生成 scala 示例,但这里有一个基本的 java 版本,如果它仍然有用的话:
// Convergence is reached when two iterations
// return the same number of results
long previousCount = -1;
long currentCount = 0;
while (previousCount != currentCount){
rdd = doSomethingThatUpdatesRdd(rdd);
// Count entries in new rdd with foreach + accumulator
rdd.foreach(tuple -> accumulator.add(1));
// Update helper values
previousCount = currentCount;
currentCount = accumulator.sum();
accumulator.reset();
}
// Convergence is reached