R 并行处理 - 节点选择
R Parallel Processing - Node Choice
我正在尝试在 Windows 上使用计算机上的 parallel 包在 R 中处理大量数据有 8 个核心。我有一个很大的 data.frame 需要逐行处理。对于每一行,我可以估计处理该行需要多长时间,这可能从每行 10 秒到 4 小时不等。
我不想 运行 在 clusterApplyLB 函数下一次完成整个程序(我知道这可能是最佳方法)因为如果遇到错误,那么我的整个结果集可能迷路了。我第一次尝试 运行 我的程序涉及将它分解成块,然后 运行 并行地单独连接每个块,保存该并行的输出 运行 然后继续下一个块.
问题在于它 运行 遍历行,而不是 运行ning 7x "real" 时间(我有 8 个核心,但我想保留一个备用) , 它似乎只是 运行ning 大约 2 倍。我猜这是因为每个核心的行分配效率低下。
例如,2 个核心的 10 行数据,其中两行可能 运行 需要 4 小时,另外两行需要 10 秒。理论上,这可能需要 4 小时 10 秒才能到达 运行,但如果分配效率低下,则可能需要 8 小时。 (显然这是夸大了,但是当估计不正确时,可能会出现更多核心和更多行的类似情况)
如果我估计这些时间并以我估计的正确顺序将它们提交给 clusterApplyLB(以将估计时间分布在多个核心上以最大限度地减少所花费的时间),它们可能不会被发送到核心我希望他们成为,因为他们可能无法在我估计的时间内完成。例如,我估计两个进程的时间分别为 10 分钟和 12 分钟,它们分别花费 11.6 分钟和 11.4 秒,然后将行提交到 clusterApplyLB 的顺序将不是我预期的。这种错误可能看起来很小,但是如果我优化了多个长时间的行,那么这种顺序混淆可能会导致两个 4 小时的行转到同一个节点而不是不同的节点(这几乎可以使我的双倍总时间)。
长话短说;博士。我的问题:有没有办法告诉 R 并行处理函数(例如 clusterApplyLB、clusterApply、parApply 或任何 sapply、lapply 或 foreach 变体)哪些行应该发送到哪些 core/node?即使没有我所处的情况,我认为这将是一个非常有用和有趣的信息提供。
我想说您的问题有 2 种不同的可能解决方法。
第一个是根据每个作业的预期计算时间对作业到节点映射的静态优化。在开始计算之前,您将为每个作业(即数据框的行)分配一个节点。下面给出了可能的实现代码。
第二种解决方案是动态的,您必须根据 clusterApplyLB
中给出的代码制作自己的负载均衡器。您将以与第一种方法相同的方式开始,但一旦作业完成,您将不得不重新计算最佳作业到节点的映射。根据您的问题,由于不断进行重新优化,这可能会增加大量开销。我认为只要你对预期的计算时间没有偏见,就没有必要走这条路。
这里是第一种解决方法的代码:
library(parallel)
#set seed for reproducible example
set.seed(1234)
#let's say you have 100 calculations (i.e., rows)
#each of them takes between 0 and 1 second computation time
expected_job_length=runif(100)
#this is your data
#real_job_length is unknown but we use it in the mock-up function below
df=data.frame(job_id=seq_along(expected_job_length),
expected_job_length=expected_job_length,
#real_job_length=expected_job_length + some noise
real_job_length=expected_job_length+
runif(length(expected_job_length),-0.05,0.05))
#we might have a negative real_job_length; fix that
df=within(df,real_job_length[real_job_length<0]<-
real_job_length[real_job_length<0]+0.05)
#detectCores() gives in my case 4
cluster_size=4
准备作业到节点的映射优化:
#x will give the node_id (between 1 and cluster_size) for each job
total_time=function(x,expected_job_length) {
#in the calculation below, x will be a vector of reals
#we have to translate it into integers in order to use it as index vector
x=as.integer(round(x))
#return max of sum of node-binned expected job lengths
max(sapply(split(expected_job_length,x),sum))
}
#now optimize the distribution of jobs amongst the nodes
#Genetic algorithm might be better for the optimization
#but Differential Evolution is good for now
library(DEoptim)
#pick large differential weighting factor (F) ...
#... to get out of local minimas due to rounding
res=DEoptim(fn=total_time,
lower=rep(1,nrow(df)),
upper=rep(cluster_size,nrow(df)),
expected_job_length=expected_job_length,
control=DEoptim.control(CR=0.85,F=1.5,trace=FALSE))
#wait for a minute or two ...
#inspect optimal solution
time_per_node=sapply(split(expected_job_length,
unname(round(res$optim$bestmem))),sum)
time_per_node
# 1 2 3 4
#10.91765 10.94893 10.94069 10.94246
plot(time_per_node,ylim=c(0,15))
abline(h=max(time_per_node),lty=2)
#add node-mapping to df
df$node_id=unname(round(res$optim$bestmem))
现在是在集群上进行计算的时候了:
#start cluster
workers=parallel::makeCluster(cluster_size)
start_time=Sys.time()
#distribute jobs according to optimal node-mapping
clusterApply(workers,split(df,df$node_id),function(x) {
for (i in seq_along(x$job_id)) {
#use tryCatch to do the error handling for jobs that fail
tryCatch({Sys.sleep(x[i,"real_job_length"])},
error=function(err) {print("Do your error handling")})
}
})
end_time=Sys.time()
#how long did it take
end_time-start_time
#Time difference of 11.12532 secs
#add to plot
abline(h=as.numeric(end_time-start_time),col="red",lty=2)
stopCluster(workers)
根据输入,您似乎已经在该任务中保存了该任务的输出。
假设每个并行任务都将输出保存为一个文件,您可能需要一个初始函数来预测特定行的时间。
为了做到这一点
- 生成具有估计时间和行号的结构
- 对估计时间进行排序并重新排序行,运行 并行
每个重新排序的行的过程。
这会自动平衡工作量。
我们有一个类似的问题,该过程必须逐列完成,每列花费 10-200 秒 。因此,我们生成了一个函数来估算时间,并根据该函数对列进行了重新排序,并对每列进行 运行 并行处理。
我正在尝试在 Windows 上使用计算机上的 parallel 包在 R 中处理大量数据有 8 个核心。我有一个很大的 data.frame 需要逐行处理。对于每一行,我可以估计处理该行需要多长时间,这可能从每行 10 秒到 4 小时不等。
我不想 运行 在 clusterApplyLB 函数下一次完成整个程序(我知道这可能是最佳方法)因为如果遇到错误,那么我的整个结果集可能迷路了。我第一次尝试 运行 我的程序涉及将它分解成块,然后 运行 并行地单独连接每个块,保存该并行的输出 运行 然后继续下一个块.
问题在于它 运行 遍历行,而不是 运行ning 7x "real" 时间(我有 8 个核心,但我想保留一个备用) , 它似乎只是 运行ning 大约 2 倍。我猜这是因为每个核心的行分配效率低下。
例如,2 个核心的 10 行数据,其中两行可能 运行 需要 4 小时,另外两行需要 10 秒。理论上,这可能需要 4 小时 10 秒才能到达 运行,但如果分配效率低下,则可能需要 8 小时。 (显然这是夸大了,但是当估计不正确时,可能会出现更多核心和更多行的类似情况)
如果我估计这些时间并以我估计的正确顺序将它们提交给 clusterApplyLB(以将估计时间分布在多个核心上以最大限度地减少所花费的时间),它们可能不会被发送到核心我希望他们成为,因为他们可能无法在我估计的时间内完成。例如,我估计两个进程的时间分别为 10 分钟和 12 分钟,它们分别花费 11.6 分钟和 11.4 秒,然后将行提交到 clusterApplyLB 的顺序将不是我预期的。这种错误可能看起来很小,但是如果我优化了多个长时间的行,那么这种顺序混淆可能会导致两个 4 小时的行转到同一个节点而不是不同的节点(这几乎可以使我的双倍总时间)。
长话短说;博士。我的问题:有没有办法告诉 R 并行处理函数(例如 clusterApplyLB、clusterApply、parApply 或任何 sapply、lapply 或 foreach 变体)哪些行应该发送到哪些 core/node?即使没有我所处的情况,我认为这将是一个非常有用和有趣的信息提供。
我想说您的问题有 2 种不同的可能解决方法。
第一个是根据每个作业的预期计算时间对作业到节点映射的静态优化。在开始计算之前,您将为每个作业(即数据框的行)分配一个节点。下面给出了可能的实现代码。
第二种解决方案是动态的,您必须根据 clusterApplyLB
中给出的代码制作自己的负载均衡器。您将以与第一种方法相同的方式开始,但一旦作业完成,您将不得不重新计算最佳作业到节点的映射。根据您的问题,由于不断进行重新优化,这可能会增加大量开销。我认为只要你对预期的计算时间没有偏见,就没有必要走这条路。
这里是第一种解决方法的代码:
library(parallel)
#set seed for reproducible example
set.seed(1234)
#let's say you have 100 calculations (i.e., rows)
#each of them takes between 0 and 1 second computation time
expected_job_length=runif(100)
#this is your data
#real_job_length is unknown but we use it in the mock-up function below
df=data.frame(job_id=seq_along(expected_job_length),
expected_job_length=expected_job_length,
#real_job_length=expected_job_length + some noise
real_job_length=expected_job_length+
runif(length(expected_job_length),-0.05,0.05))
#we might have a negative real_job_length; fix that
df=within(df,real_job_length[real_job_length<0]<-
real_job_length[real_job_length<0]+0.05)
#detectCores() gives in my case 4
cluster_size=4
准备作业到节点的映射优化:
#x will give the node_id (between 1 and cluster_size) for each job
total_time=function(x,expected_job_length) {
#in the calculation below, x will be a vector of reals
#we have to translate it into integers in order to use it as index vector
x=as.integer(round(x))
#return max of sum of node-binned expected job lengths
max(sapply(split(expected_job_length,x),sum))
}
#now optimize the distribution of jobs amongst the nodes
#Genetic algorithm might be better for the optimization
#but Differential Evolution is good for now
library(DEoptim)
#pick large differential weighting factor (F) ...
#... to get out of local minimas due to rounding
res=DEoptim(fn=total_time,
lower=rep(1,nrow(df)),
upper=rep(cluster_size,nrow(df)),
expected_job_length=expected_job_length,
control=DEoptim.control(CR=0.85,F=1.5,trace=FALSE))
#wait for a minute or two ...
#inspect optimal solution
time_per_node=sapply(split(expected_job_length,
unname(round(res$optim$bestmem))),sum)
time_per_node
# 1 2 3 4
#10.91765 10.94893 10.94069 10.94246
plot(time_per_node,ylim=c(0,15))
abline(h=max(time_per_node),lty=2)
#add node-mapping to df
df$node_id=unname(round(res$optim$bestmem))
现在是在集群上进行计算的时候了:
#start cluster
workers=parallel::makeCluster(cluster_size)
start_time=Sys.time()
#distribute jobs according to optimal node-mapping
clusterApply(workers,split(df,df$node_id),function(x) {
for (i in seq_along(x$job_id)) {
#use tryCatch to do the error handling for jobs that fail
tryCatch({Sys.sleep(x[i,"real_job_length"])},
error=function(err) {print("Do your error handling")})
}
})
end_time=Sys.time()
#how long did it take
end_time-start_time
#Time difference of 11.12532 secs
#add to plot
abline(h=as.numeric(end_time-start_time),col="red",lty=2)
stopCluster(workers)
根据输入,您似乎已经在该任务中保存了该任务的输出。 假设每个并行任务都将输出保存为一个文件,您可能需要一个初始函数来预测特定行的时间。 为了做到这一点
- 生成具有估计时间和行号的结构
- 对估计时间进行排序并重新排序行,运行 并行 每个重新排序的行的过程。
这会自动平衡工作量。 我们有一个类似的问题,该过程必须逐列完成,每列花费 10-200 秒 。因此,我们生成了一个函数来估算时间,并根据该函数对列进行了重新排序,并对每列进行 运行 并行处理。