我可以同时使用并行版本的for循环和apply family吗?

Can I use the parallel version of for loop and apply family together?

最近在研究的时候遇到了一个问题:一开始我定义了一个函数myfunction,里面有两个for循环,然后我用lapply(datalist, myfunction),但是处理的是太慢了。

然后学习了两个并行包'foreach'和'parallel'做并行计算。所以我将这两个过程都改为它们的并行版本。

但是我发现当我 运行 我的代码时,我的函数中的 foreach 似乎不起作用。

myfunction <- function{data} {

   df  <- foreach (i = 1:200, .combine = "rbind") %:% 
    foreach(j = 1:200, .combine = "rbind") %dopar% {

      *****
      process
      *****
    }

  data <- df[1,1]
  return(data)
}

system.time({

  cl <- detectCores()
  cl <- makeCluster(cl)
  registerDoParallel(cl)

  mat <- t(parSapply(cl, list, myfuntion))

  stopCluster(cl)

}) 

我觉得是因为 parSapply 占用了整个核心所以 foreach 没有额外的核心来计算。有什么好主意可以解决吗?基本上我想在它们的并行版本中实现两个进程 运行ning。

另一个问题是:假设我们只能选择一个进程进行并行计算,我应该选择哪个? for 循环还是 apply 系列?

非常感谢:)

I feel like it's due to the parSapply occupied the whole cores so foreach don't have additional cores to compute. Is there any good idea to fix it? Basically I want to achieve both two processes running in their parallel versions.

不,这不是个好主意。您基本上是在尝试过度并行化(但这确实发生在您的代码中,如下所述)。

Another problem is: suppose we can only choose one process to do the parallel computation, which one should I choose? The for loop or apply family?

没有一个正确答案。我建议您对 *** process *** 代码进行剖析,以确定它从并行化中获得了多少收益。

所以,我发现您的 parSapply(cl, ...)foreach() %dopar% { ... } 之上使用相同的集群 cl 很有趣。我第一次以这种方式看到 asked/proposed。你肯定不想这样做,但 question/attempt 并不疯狂。当 foreach() %dopar% { ... } 尝试使用它们时,您的直觉认为所有 worker 都会被占用是部分正确的。然而,真正发生的是 foreach() %dopar% { ... } 语句是在 workers 中计算的,而不是在定义集群 cl 的主 R 会话中计算的。在 worker 上,没有注册 foreach 适配器,因此这些调用将默认为顺序处理 (== foreach::registerDoSEQ())。要实现嵌套并行化,您必须在每个工作人员中设置和注册一个集群,例如在 myfunction() 函数中。

作为 future 框架的作者,我想建议您利用它。它会保护您免受上述错误的影响,并且它也不会过度并行(如果您真的很想这样做,您可以这样做)。以下是我将如何重写您的代码示例:

library(foreach) ## foreach() and %dopar%

myfunction <- function{data} {
   df  <- foreach(i = 1:200, .combine = "rbind") %:% 
    foreach(j = 1:200, .combine = "rbind") %dopar% {
      *****
      process
      *****
    }

  data <- df[1,1]
  return(data)
}


## Tell foreach to parallelize via the future framework
doFuture::registerDoFuture()

## Have the future framework parallelize using a cluster of
## local workers (similar to makeCluster(detectCores()))
library(future)
plan(multisession)

library(future.apply) ## future_sapply()

system.time({
  mat <- t(future_sapply(list, myfuntion))
})

现在,重要的是要了解外部 future_sapply() 并行化将在 'multisession' 集群上运行。当你到达内部 foreach() %dopar% { ... } 并行化时,foreach 看到的只是一个顺序工作程序,因此内层将被并行处理。这就是我的意思,未来的框架会自动保护你免受过度并行化的影响。

如果你想让内层在 'multisession' 集群上并行化,而外层是顺序的,你可以将其设置为:

plan(list(sequential, multisession))

如果你真的想做嵌套并行化,比如两个外层worker和4个内层worker,你可以使用:

plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4))

这将同时 运行 2*4 = 8 个并行 R 进程。

更有用的是,当您有多台机器可用时,您可以将这些机器用于外层,然后在每台机器上使用多会话集群。类似于:

plan(list(tweak(cluster, workers = c("machine1", "machine2")), multisession))

您可以在 future 小插图中阅读更多相关信息。