Spark 花费太多时间并为某些任务创造了数千个工作岗位

Spark is taking too much time and creating thousands of jobs for some tasks

机器配置:

问题:

案例场景:

我正在将 3 个表提取到数据帧中,该数据帧少于 10 行。是的,少于 10(十)个。 将其提取到数据框中后,我多次执行连接、计数、显示和收集操作。当我执行我的程序时,Spark 会创建 40404 个作业 4 次。它表示 count 需要执行这些作业。我在程序中使用 count 4-5 次。等待超过 18 分钟(大约 18.5 到 20 分钟)后,它给了我预期的输出。

提前致谢。

示例代码 1:

def getGroups(id: Array[String], level: Int): DataFrame = {
    var lvl = level
    if (level >= 0) {
      for (iterated_id <- id) {
        val single_level_group = supportive_df.filter("id = '" + iterated_id + "' and level = " + level).select("family_id")
        //single_level_group.show()
        intermediate_df = intermediate_df.unionAll(single_level_group)
        //println("for loop portion...")
      }
      final_df = final_df.unionAll(intermediate_df)
      lvl -= 1
      val user_id_param = intermediate_df.collect().map { row => row.getString(0) }
      intermediate_df = empty_df
      //println("new method...if portion...")
      getGroups(user_id_param, lvl)
    } else {
      //println("new method...")
      final_df.distinct()
    }
  }

示例代码 2:

 setGetGroupsVars("u_id", user_id.toString(), sa_user_df)
  var user_belong_groups: DataFrame = empty_df
  val user_array = Array[String](user_id.toString())

  val user_levels = sa_user_df.filter("id = '" + user_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }

  println(user_levels.length+"...rapak")
  println(user_id.toString())
  for (u_lvl <- user_levels) {
    val x1 = getGroups(user_array, u_lvl)
    x1.show()
    empty_df.show()
    user_belong_groups.show()
    user_belong_groups = user_belong_groups.unionAll(x1)
    x1.show()
  }
  setGetGroupsVars("obj_id", obj_id.toString(), obj_type_specific_df)
  var obj_belong_groups: DataFrame = empty_df
  val obj_array = Array[String](obj_id.toString())
  val obj_levels = obj_type_specific_df.filter("id = '" + obj_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
  println(obj_levels.length)
  for (ob_lvl <- obj_levels) {
    obj_belong_groups = obj_belong_groups.unionAll(getGroups(obj_array, ob_lvl))
  }
  user_belong_groups = user_belong_groups.distinct()
  obj_belong_groups = obj_belong_groups.distinct()
  var user_obj_joined_df = user_belong_groups.join(obj_belong_groups)
  user_obj_joined_df.show()

  println("vbgdivsivbfb")
  var user_obj_access_df = user_obj_joined_df
    .join(sa_other_access_df, user_obj_joined_df("u_id") === sa_other_access_df("user_id")
      && user_obj_joined_df("obj_id") === sa_other_access_df("object_id"))
  user_obj_access_df.show()
  println("KDDD..")

  val user_obj_access_cond1 = user_obj_access_df.filter("u_id = '" + user_id + "' and obj_id != '" + obj_id + "'")
  if (user_obj_access_cond1.count() == 0) {
    val user_obj_access_cond2 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id = '" + obj_id + "'")
    if (user_obj_access_cond2.count() == 0) {
      val user_obj_access_cond3 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id != '" + obj_id + "'")
      if (user_obj_access_cond3.count() == 0) {
        default_df
      } else {
        val result_ugrp_to_objgrp = user_obj_access_cond3.select("permission").agg(max("permission"))
        println("cond4")
        result_ugrp_to_objgrp
      }
    } else {
      val result_ugrp_to_ob = user_obj_access_cond2.select("permission")
      println("cond3")
      result_ugrp_to_ob
    }
  } else {
    val result_u_to_obgrp = user_obj_access_cond1.select("permission")
    println("cond2")
    result_u_to_obgrp
  }
} else {
  println("cond1")
  individual_access
}

这两个是我程序中的主要代码块,执行时间过长。显示或计数操作通常需要很长时间。

好的,让我们记住一些基础知识!

Sparklazyshowcount 是动作。

动作触发转变,你有很多。如果您从 Cassandra(或任何其他来源)汇集数据,这会花费很多,因为您似乎没有缓存您的转换!

因此,当您在 DataFrame 或 RDD 上进行密集计算时,您需要考虑缓存,这将使您的操作执行得更快!

关于你有那么多任务(工作)的原因当然是通过 spark 并行机制来解释你执行的操作乘以你正在执行的 transformations/actions 的数量,更不用说循环了!

尽管如此,根据给出的信息和问题中发布的代码片段的质量,就我的回答而言。

希望对您有所帮助!

  • 首先你可以在 GUI 中检查程序的哪个阶段花费了很长时间。
  • 其次,您多次使用 distinct(),因此在使用 distinct() 时,您必须查看 distinct 之后有多少个分区。我认为这就是 spark 创造数千个工作岗位的原因。
  • 如果这是您可以在 distinct() 之后使用 coalesce() 的原因。