如何计时Spark程序执行速度

How to time Spark program execution speed

我想为我的 Spark 程序执行速度计时,但由于懒惰,这非常困难。让我们在这里考虑这个(无意义的)代码:

var graph = GraphLoader.edgeListFile(context, args(0))
val graph_degs = graph.outerJoinVertices(graph.degrees).triplets.cache

/* I'd need to start the timer here */
val t1 = System.currentTimeMillis  
val edges = graph_degs.flatMap(trip =>  { /* do something*/ })
                      .union(graph_degs)

val count = edges.count
val t2 = System.currentTimeMillis 
/* I'd need to stop the timer here */

println("It took " + t2-t1 + " to count " + count)

问题是,转换是惰性的,所以在 val count = edges.count 行之前没有任何计算。但是根据我的观点 t1 得到一个值,尽管上面的代码没有值......尽管代码中的位置,上面的代码 t1 在计时器启动后得到评估。这是个问题...

在 Spark Web UI 我找不到任何有趣的东西,因为我需要在特定代码行之后花费时间。您认为是否有一个简单的解决方案来查看一组转换何时得到真实评估?

Spark Web UI 记录每个动作,甚至报告该动作每个阶段的时间 - 都在里面!您需要查看阶段选项卡,而不是作业。我发现它只有在您编译并提交代码时才可用。它在 REPL 中没有用,你是否有机会使用它?

由于连续的转换(在同一个 task - 意思是,它们没有被 shuffles 分开并且作为相同 shuffle 的一部分执行=23=]action) 作为单个 "step" 执行,Spark not 单独测量它们。从驱动程序代码 - 你也不能。

可以做的是测量将您的函数应用于每条记录的持续时间,并使用累加器 总结一下,例如:

// create accumulator
val durationAccumulator = sc.longAccumulator("flatMapDuration")

// "wrap" your "doSomething" operation with time measurement, and add to accumulator
val edges = rdd.flatMap(trip => {
  val t1 = System.currentTimeMillis
  val result = doSomething(trip)
  val t2 = System.currentTimeMillis
  durationAccumulator.add(t2 - t1)
  result
})

// perform the action that would trigger evaluation
val count = edges.count

// now you can read the accumulated value
println("It took " + durationAccumulator.value + " to flatMap " + count)

您可以对任何单个转换重复此操作。

免责声明:

  • 当然,这不包括 Spark 花在整理事物和进行实际计数上的时间 - 为此,事实上,Spark UI 是您最好的资源。
  • 请注意,累加器对重试之类的事情很敏感 - 重试任务将更新累加器两次。

样式注释: 您可以通过创建一个 measure 函数使此代码更可重用,该函数围绕任何函数 "wraps" 并更新给定的累加器:

// write this once:
def measure[T, R](action: T => R, acc: LongAccumulator): T => R = input => { 
  val t1 = System.currentTimeMillis
  val result = action(input)
  val t2 = System.currentTimeMillis
  acc.add(t2 - t1)
  result
}

// use it with any transformation:
rdd.flatMap(measure(doSomething, durationAccumulator))