Spark:通过 UDF 分配少量计算密集型任务
Spark: Distribute low number of compute-intensive tasks via UDF
我有一个带有 5 个可用于计算的工作节点的 spark 集群(在 Azure Databricks 中)。但是,我需要解决的任务与典型的 spark 用例不同:我没有一个需要应用于数百万行的简单任务,而是必须 运行 对 60 行数据进行非常复杂的操作.
我的意图是基本上将60个任务分配给5个工人,让每个工人处理60/5 = 12
个任务。为此,我知道执行者的数量应该等于工人的数量。好像是这样的,如运行ning
所示
num_executors = len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()) - 1
# returns 5
这是一些天真的伪代码 运行s,但只在一个工人身上:
def my_complex_function(input):
# this function uses all available cores (internally parallelized)
# and takes about 15 minutes to complete per call if run on a
# single worker node
do_stuff(input_row)
write_output_to_file(stuff)
return(debug_message)
UDF_function = udf(lambda z: my_complex_function(input_row), StringType())
sdf = spark.createDataFrame(data=data,schema=["data"])
# sdf contains 60 rows and a single column, "data".
# "data" is just a path to blob storage file that needs to be processed.
sdf_new = sdf.withColumn("output", UDF_function(col("data")))
display(sdf_new) # <- Triggers the computation
如前所述,这似乎 运行 仅适用于单个工人。我假设这是因为我的数据集太小了,它没有分发给不同的工人——我试图用以下方法解决这个问题:
sdf = sdf.repartition(num_executors)
然而,这仍然不起作用。如 Spark UI 和我的日志文件所示,只使用了一个工人。
我需要设置什么才能让每个执行者 运行 并行地分担任务?
display
函数正在推测安排尽可能少的任务,以产生上限为 1000 行的输出。它从安排一项任务开始,并希望这就足够了。然后是 4、20……等等。你的情况需要很长时间。
您可以尝试在驱动程序处收集所有内容:
sdf_new.collect()
通过在驱动程序中收集所有内容,您肯定会触发对数据框的完整评估。
我有一个带有 5 个可用于计算的工作节点的 spark 集群(在 Azure Databricks 中)。但是,我需要解决的任务与典型的 spark 用例不同:我没有一个需要应用于数百万行的简单任务,而是必须 运行 对 60 行数据进行非常复杂的操作.
我的意图是基本上将60个任务分配给5个工人,让每个工人处理60/5 = 12
个任务。为此,我知道执行者的数量应该等于工人的数量。好像是这样的,如运行ning
num_executors = len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()) - 1
# returns 5
这是一些天真的伪代码 运行s,但只在一个工人身上:
def my_complex_function(input):
# this function uses all available cores (internally parallelized)
# and takes about 15 minutes to complete per call if run on a
# single worker node
do_stuff(input_row)
write_output_to_file(stuff)
return(debug_message)
UDF_function = udf(lambda z: my_complex_function(input_row), StringType())
sdf = spark.createDataFrame(data=data,schema=["data"])
# sdf contains 60 rows and a single column, "data".
# "data" is just a path to blob storage file that needs to be processed.
sdf_new = sdf.withColumn("output", UDF_function(col("data")))
display(sdf_new) # <- Triggers the computation
如前所述,这似乎 运行 仅适用于单个工人。我假设这是因为我的数据集太小了,它没有分发给不同的工人——我试图用以下方法解决这个问题:
sdf = sdf.repartition(num_executors)
然而,这仍然不起作用。如 Spark UI 和我的日志文件所示,只使用了一个工人。
我需要设置什么才能让每个执行者 运行 并行地分担任务?
display
函数正在推测安排尽可能少的任务,以产生上限为 1000 行的输出。它从安排一项任务开始,并希望这就足够了。然后是 4、20……等等。你的情况需要很长时间。
您可以尝试在驱动程序处收集所有内容:
sdf_new.collect()
通过在驱动程序中收集所有内容,您肯定会触发对数据框的完整评估。