Spark 花费太多时间并为某些任务创造了数千个工作岗位
Spark is taking too much time and creating thousands of jobs for some tasks
机器配置:
- 内存:16GB
- 处理器:4 核(Xeon E3 3.3 GHz)
问题:
- 耗时:超过 18 分钟
案例场景:
Spark Mode: Local
数据库:使用 Cassandra 2.1.12
我正在将 3 个表提取到数据帧中,该数据帧少于 10 行。是的,少于 10(十)个。
将其提取到数据框中后,我多次执行连接、计数、显示和收集操作。当我执行我的程序时,Spark 会创建 40404 个作业 4 次。它表示 count 需要执行这些作业。我在程序中使用 count 4-5 次。等待超过 18 分钟(大约 18.5 到 20 分钟)后,它给了我预期的输出。
- 为什么 Spark 创造了这么多工作岗位?
- 执行这么多作业(大约 40404 * 4)需要花费这么多时间(18 分钟)是否显而易见('ok')?
提前致谢。
示例代码 1:
def getGroups(id: Array[String], level: Int): DataFrame = {
var lvl = level
if (level >= 0) {
for (iterated_id <- id) {
val single_level_group = supportive_df.filter("id = '" + iterated_id + "' and level = " + level).select("family_id")
//single_level_group.show()
intermediate_df = intermediate_df.unionAll(single_level_group)
//println("for loop portion...")
}
final_df = final_df.unionAll(intermediate_df)
lvl -= 1
val user_id_param = intermediate_df.collect().map { row => row.getString(0) }
intermediate_df = empty_df
//println("new method...if portion...")
getGroups(user_id_param, lvl)
} else {
//println("new method...")
final_df.distinct()
}
}
示例代码 2:
setGetGroupsVars("u_id", user_id.toString(), sa_user_df)
var user_belong_groups: DataFrame = empty_df
val user_array = Array[String](user_id.toString())
val user_levels = sa_user_df.filter("id = '" + user_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
println(user_levels.length+"...rapak")
println(user_id.toString())
for (u_lvl <- user_levels) {
val x1 = getGroups(user_array, u_lvl)
x1.show()
empty_df.show()
user_belong_groups.show()
user_belong_groups = user_belong_groups.unionAll(x1)
x1.show()
}
setGetGroupsVars("obj_id", obj_id.toString(), obj_type_specific_df)
var obj_belong_groups: DataFrame = empty_df
val obj_array = Array[String](obj_id.toString())
val obj_levels = obj_type_specific_df.filter("id = '" + obj_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
println(obj_levels.length)
for (ob_lvl <- obj_levels) {
obj_belong_groups = obj_belong_groups.unionAll(getGroups(obj_array, ob_lvl))
}
user_belong_groups = user_belong_groups.distinct()
obj_belong_groups = obj_belong_groups.distinct()
var user_obj_joined_df = user_belong_groups.join(obj_belong_groups)
user_obj_joined_df.show()
println("vbgdivsivbfb")
var user_obj_access_df = user_obj_joined_df
.join(sa_other_access_df, user_obj_joined_df("u_id") === sa_other_access_df("user_id")
&& user_obj_joined_df("obj_id") === sa_other_access_df("object_id"))
user_obj_access_df.show()
println("KDDD..")
val user_obj_access_cond1 = user_obj_access_df.filter("u_id = '" + user_id + "' and obj_id != '" + obj_id + "'")
if (user_obj_access_cond1.count() == 0) {
val user_obj_access_cond2 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id = '" + obj_id + "'")
if (user_obj_access_cond2.count() == 0) {
val user_obj_access_cond3 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id != '" + obj_id + "'")
if (user_obj_access_cond3.count() == 0) {
default_df
} else {
val result_ugrp_to_objgrp = user_obj_access_cond3.select("permission").agg(max("permission"))
println("cond4")
result_ugrp_to_objgrp
}
} else {
val result_ugrp_to_ob = user_obj_access_cond2.select("permission")
println("cond3")
result_ugrp_to_ob
}
} else {
val result_u_to_obgrp = user_obj_access_cond1.select("permission")
println("cond2")
result_u_to_obgrp
}
} else {
println("cond1")
individual_access
}
这两个是我程序中的主要代码块,执行时间过长。显示或计数操作通常需要很长时间。
好的,让我们记住一些基础知识!
Spark 是 lazy,show
和 count
是动作。
动作触发转变,你有很多。如果您从 Cassandra(或任何其他来源)汇集数据,这会花费很多,因为您似乎没有缓存您的转换!
因此,当您在 DataFrame 或 RDD 上进行密集计算时,您需要考虑缓存,这将使您的操作执行得更快!
关于你有那么多任务(工作)的原因当然是通过 spark 并行机制来解释你执行的操作乘以你正在执行的 transformations/actions 的数量,更不用说循环了!
尽管如此,根据给出的信息和问题中发布的代码片段的质量,就我的回答而言。
希望对您有所帮助!
- 首先你可以在 GUI 中检查程序的哪个阶段花费了很长时间。
- 其次,您多次使用
distinct()
,因此在使用 distinct()
时,您必须查看 distinct 之后有多少个分区。我认为这就是 spark 创造数千个工作岗位的原因。
- 如果这是您可以在
distinct()
之后使用 coalesce()
的原因。
机器配置:
- 内存:16GB
- 处理器:4 核(Xeon E3 3.3 GHz)
问题:
- 耗时:超过 18 分钟
案例场景:
Spark Mode: Local
数据库:使用 Cassandra 2.1.12
我正在将 3 个表提取到数据帧中,该数据帧少于 10 行。是的,少于 10(十)个。 将其提取到数据框中后,我多次执行连接、计数、显示和收集操作。当我执行我的程序时,Spark 会创建 40404 个作业 4 次。它表示 count 需要执行这些作业。我在程序中使用 count 4-5 次。等待超过 18 分钟(大约 18.5 到 20 分钟)后,它给了我预期的输出。
- 为什么 Spark 创造了这么多工作岗位?
- 执行这么多作业(大约 40404 * 4)需要花费这么多时间(18 分钟)是否显而易见('ok')?
提前致谢。
示例代码 1:
def getGroups(id: Array[String], level: Int): DataFrame = {
var lvl = level
if (level >= 0) {
for (iterated_id <- id) {
val single_level_group = supportive_df.filter("id = '" + iterated_id + "' and level = " + level).select("family_id")
//single_level_group.show()
intermediate_df = intermediate_df.unionAll(single_level_group)
//println("for loop portion...")
}
final_df = final_df.unionAll(intermediate_df)
lvl -= 1
val user_id_param = intermediate_df.collect().map { row => row.getString(0) }
intermediate_df = empty_df
//println("new method...if portion...")
getGroups(user_id_param, lvl)
} else {
//println("new method...")
final_df.distinct()
}
}
示例代码 2:
setGetGroupsVars("u_id", user_id.toString(), sa_user_df)
var user_belong_groups: DataFrame = empty_df
val user_array = Array[String](user_id.toString())
val user_levels = sa_user_df.filter("id = '" + user_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
println(user_levels.length+"...rapak")
println(user_id.toString())
for (u_lvl <- user_levels) {
val x1 = getGroups(user_array, u_lvl)
x1.show()
empty_df.show()
user_belong_groups.show()
user_belong_groups = user_belong_groups.unionAll(x1)
x1.show()
}
setGetGroupsVars("obj_id", obj_id.toString(), obj_type_specific_df)
var obj_belong_groups: DataFrame = empty_df
val obj_array = Array[String](obj_id.toString())
val obj_levels = obj_type_specific_df.filter("id = '" + obj_id + "'").select("level").distinct().collect().map { x => x.getInt(0) }
println(obj_levels.length)
for (ob_lvl <- obj_levels) {
obj_belong_groups = obj_belong_groups.unionAll(getGroups(obj_array, ob_lvl))
}
user_belong_groups = user_belong_groups.distinct()
obj_belong_groups = obj_belong_groups.distinct()
var user_obj_joined_df = user_belong_groups.join(obj_belong_groups)
user_obj_joined_df.show()
println("vbgdivsivbfb")
var user_obj_access_df = user_obj_joined_df
.join(sa_other_access_df, user_obj_joined_df("u_id") === sa_other_access_df("user_id")
&& user_obj_joined_df("obj_id") === sa_other_access_df("object_id"))
user_obj_access_df.show()
println("KDDD..")
val user_obj_access_cond1 = user_obj_access_df.filter("u_id = '" + user_id + "' and obj_id != '" + obj_id + "'")
if (user_obj_access_cond1.count() == 0) {
val user_obj_access_cond2 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id = '" + obj_id + "'")
if (user_obj_access_cond2.count() == 0) {
val user_obj_access_cond3 = user_obj_access_df.filter("u_id != '" + user_id + "' and obj_id != '" + obj_id + "'")
if (user_obj_access_cond3.count() == 0) {
default_df
} else {
val result_ugrp_to_objgrp = user_obj_access_cond3.select("permission").agg(max("permission"))
println("cond4")
result_ugrp_to_objgrp
}
} else {
val result_ugrp_to_ob = user_obj_access_cond2.select("permission")
println("cond3")
result_ugrp_to_ob
}
} else {
val result_u_to_obgrp = user_obj_access_cond1.select("permission")
println("cond2")
result_u_to_obgrp
}
} else {
println("cond1")
individual_access
}
这两个是我程序中的主要代码块,执行时间过长。显示或计数操作通常需要很长时间。
好的,让我们记住一些基础知识!
Spark 是 lazy,show
和 count
是动作。
动作触发转变,你有很多。如果您从 Cassandra(或任何其他来源)汇集数据,这会花费很多,因为您似乎没有缓存您的转换!
因此,当您在 DataFrame 或 RDD 上进行密集计算时,您需要考虑缓存,这将使您的操作执行得更快!
关于你有那么多任务(工作)的原因当然是通过 spark 并行机制来解释你执行的操作乘以你正在执行的 transformations/actions 的数量,更不用说循环了!
尽管如此,根据给出的信息和问题中发布的代码片段的质量,就我的回答而言。
希望对您有所帮助!
- 首先你可以在 GUI 中检查程序的哪个阶段花费了很长时间。
- 其次,您多次使用
distinct()
,因此在使用distinct()
时,您必须查看 distinct 之后有多少个分区。我认为这就是 spark 创造数千个工作岗位的原因。 - 如果这是您可以在
distinct()
之后使用coalesce()
的原因。