dopar foreach(运行 并行 for 循环)
dopar foreach (running a parallel for loop)
这个问题与运行在多核上使用 for 循环特别相关。我正在尝试学习如何 运行 使用并行内核的代码。实际代码有些复杂,因此我在此处重新创建 一个非常基本的稀释代码 。请注意,此示例仅用于说明目的,并非实际代码。
library(parallel)
library(foreach)
library(doParallel)
#Creating a mock dataframe
Event_ID = c(1,1,1,1,1,2,2,2,2,2,3,3,3,3)
Type=c("A","B","C","D","E","A","B","C","D","E","A","B","C","D")
Revenue1=c(24,9,51,7,22,15,86,66,0,57,44,93,34,37)
Revenue2=c(16,93,96,44,67,73,12,65,81,22,39,94,41,30)
z = data.frame(Event_ID,Type,Revenue1,Revenue2)
#replicates z 5000 times
n =5000
zz=do.call("rbind", replicate(n, z, simplify = FALSE))
zz$Revenue3 = 0
#################################################################
# **foreach, dopar failed attempt**
#################################################################
cl=parallel::makeCluster(14,type="PSOCK") #I have 8 core 16 threads but use 14 here. Please edit this accordingly.
registerDoParallel(cl)
home1 = function(zz1){
foreach(i=1:nrow(zz1), .combine = rbind) %dopar% {
zz1[i,'Revenue3'] = sqrt(zz1[i,'Revenue1'])+(zz1[i,'Revenue2'])
}
return(zz1)
}
zzz = home1(zz1=zz)
stopCluster(cl)
#################################################################
#Non parallel implementation
#################################################################
home2 = function(zz2){
zz3=zz2
for (i in 1:nrow(zz3)){
zz3[i,'Revenue3'] = sqrt(zz3[i,'Revenue1'])+(zz3[i,'Revenue2'])
}
return(zz3)
}
zzzz=home2(zz2=zz)
我创建了一个数据框并尝试使用 foreach 和 dopar,但它似乎不起作用。接下来我提供代码的非并行版本的实现。但是,并行版本对我不起作用。我得到的输出 df 与输入矩阵相同。我意识到我可能犯了一个基本错误,但我没有足够的经验来弄清楚到底是什么地方出了问题。任何帮助将不胜感激。
P.S。我确实意识到我的非并行版本不是最优的并且可以改进,但这只是一个例子。
首先请注意,运行 并行使用 parallel
、doParallel
或 foreach
包在覆盖现有 [=29= 中的值时有些受限].当通过这个包执行并行化时,正在启动并行 运行 R 会话,一个工作人员,它执行计算 returning 以某种方便的形式执行的任何结果。这意味着这些工作会话包含 none 原始会话中的对象,除非提供(使用 .export 或作为函数参数)。 future
、promises
和 ipc
包允许异步处理,同时在原始会话中修改变量,但代价是一些简单性。
请注意,由于每个会话都将使用一个单独的内核,因此使用的会话多于内核会降低整体性能。
至于实现本身,您想如何实现并行化取决于您的计算需要什么以及您想要哪种格式 return。如果你想执行简单的行式计算,你可以使用类似的东西:
library(iterators)
cl=parallel::makeCluster(4) #I have 8 core 16 threads but use 14 here. Please edit this accordingly.
registerDoParallel(cl)
stopCluster(cl)
home1 <- function(zz1){
output <- foreach(x = iter(zz1, by = "row"), .combine = rbind) %dopar% {
x[["Revenue3"]] <- sqrt(x[["Revenue1"]]) + x[["Revenue2"]]
x
}
output
}
zzz <- home1(zz1=zz)
stopCluster(cl)
请注意,我在这里使用了 Iterator 结构,可用于高效地迭代 rows/columns。如果这是您正在寻找的计算类型,我建议您采用矢量化方法,因为这将大大提高性能。
zz[["Revenue3"]] <- sqrt(zz[["Revenue2"]) + zz[["Revenue1"]]
后者在我的微型 4 核笔记本电脑上大约快 13000 倍。
这个问题与运行在多核上使用 for 循环特别相关。我正在尝试学习如何 运行 使用并行内核的代码。实际代码有些复杂,因此我在此处重新创建 一个非常基本的稀释代码 。请注意,此示例仅用于说明目的,并非实际代码。
library(parallel)
library(foreach)
library(doParallel)
#Creating a mock dataframe
Event_ID = c(1,1,1,1,1,2,2,2,2,2,3,3,3,3)
Type=c("A","B","C","D","E","A","B","C","D","E","A","B","C","D")
Revenue1=c(24,9,51,7,22,15,86,66,0,57,44,93,34,37)
Revenue2=c(16,93,96,44,67,73,12,65,81,22,39,94,41,30)
z = data.frame(Event_ID,Type,Revenue1,Revenue2)
#replicates z 5000 times
n =5000
zz=do.call("rbind", replicate(n, z, simplify = FALSE))
zz$Revenue3 = 0
#################################################################
# **foreach, dopar failed attempt**
#################################################################
cl=parallel::makeCluster(14,type="PSOCK") #I have 8 core 16 threads but use 14 here. Please edit this accordingly.
registerDoParallel(cl)
home1 = function(zz1){
foreach(i=1:nrow(zz1), .combine = rbind) %dopar% {
zz1[i,'Revenue3'] = sqrt(zz1[i,'Revenue1'])+(zz1[i,'Revenue2'])
}
return(zz1)
}
zzz = home1(zz1=zz)
stopCluster(cl)
#################################################################
#Non parallel implementation
#################################################################
home2 = function(zz2){
zz3=zz2
for (i in 1:nrow(zz3)){
zz3[i,'Revenue3'] = sqrt(zz3[i,'Revenue1'])+(zz3[i,'Revenue2'])
}
return(zz3)
}
zzzz=home2(zz2=zz)
我创建了一个数据框并尝试使用 foreach 和 dopar,但它似乎不起作用。接下来我提供代码的非并行版本的实现。但是,并行版本对我不起作用。我得到的输出 df 与输入矩阵相同。我意识到我可能犯了一个基本错误,但我没有足够的经验来弄清楚到底是什么地方出了问题。任何帮助将不胜感激。
P.S。我确实意识到我的非并行版本不是最优的并且可以改进,但这只是一个例子。
首先请注意,运行 并行使用 parallel
、doParallel
或 foreach
包在覆盖现有 [=29= 中的值时有些受限].当通过这个包执行并行化时,正在启动并行 运行 R 会话,一个工作人员,它执行计算 returning 以某种方便的形式执行的任何结果。这意味着这些工作会话包含 none 原始会话中的对象,除非提供(使用 .export 或作为函数参数)。 future
、promises
和 ipc
包允许异步处理,同时在原始会话中修改变量,但代价是一些简单性。
请注意,由于每个会话都将使用一个单独的内核,因此使用的会话多于内核会降低整体性能。
至于实现本身,您想如何实现并行化取决于您的计算需要什么以及您想要哪种格式 return。如果你想执行简单的行式计算,你可以使用类似的东西:
library(iterators)
cl=parallel::makeCluster(4) #I have 8 core 16 threads but use 14 here. Please edit this accordingly.
registerDoParallel(cl)
stopCluster(cl)
home1 <- function(zz1){
output <- foreach(x = iter(zz1, by = "row"), .combine = rbind) %dopar% {
x[["Revenue3"]] <- sqrt(x[["Revenue1"]]) + x[["Revenue2"]]
x
}
output
}
zzz <- home1(zz1=zz)
stopCluster(cl)
请注意,我在这里使用了 Iterator 结构,可用于高效地迭代 rows/columns。如果这是您正在寻找的计算类型,我建议您采用矢量化方法,因为这将大大提高性能。
zz[["Revenue3"]] <- sqrt(zz[["Revenue2"]) + zz[["Revenue1"]]
后者在我的微型 4 核笔记本电脑上大约快 13000 倍。