如何 fork/parallelize 在 purrr::pmap 中处理
How to fork/parallelize process in purrr::pmap
我有以下代码使用 purr::pmap
进行串行处理
library(tidyverse)
set.seed(1)
params <- tribble(
~mean, ~sd, ~n,
5, 1, 1,
10, 5, 3,
-3, 10, 5
)
params %>%
pmap(rnorm)
#> [[1]]
#> [1] 4.373546
#>
#> [[2]]
#> [1] 10.918217 5.821857 17.976404
#>
#> [[3]]
#> [1] 0.2950777 -11.2046838 1.8742905 4.3832471 2.7578135
如何并行化(分叉)上述过程,使其运行得更快并产生相同的结果?
在这里,我使用 rnorm
进行说明,实际上我有一个功能可以完成繁重的工作。它需要并行化。
我对非 purrr(非 tidyverse)解决方案持开放态度,只要它在给定 rnorm
函数和 params
作为输入的情况下产生相同的结果。
简而言之:"parallel pmap()
",允许与 pmap()
类似的语法,可能看起来像:lift(mcmapply)()
或 lift(clusterMap)()
.
如果您不在 Windows,您可以:
library(parallel)
# forking
set.seed(1, "L'Ecuyer")
params %>%
lift(mcmapply, mc.cores = detectCores() - 1)(FUN = rnorm)
# [[1]]
# [1] 4.514604
#
# [[2]]
# [1] 0.7022156 0.8734875 5.0250478
#
# [[3]]
# [1] 8.7704060 11.7217925 -12.8776289 -10.7466152 0.5177089
编辑
这里有一个 "cleaner" 选项,感觉应该更像使用 pmap
:
nc <- max(parallel::detectCores() - 1, 1L)
par_pmap <- function(.l, .f, ..., mc.cores = getOption("mc.cores", 2L)) {
do.call(
parallel::mcmapply,
c(.l, list(FUN = .f, MoreArgs = list(...), SIMPLIFY = FALSE, mc.cores = mc.cores))
)
}
f <- function(n, mean, sd, ...) rnorm(n, mean, sd)
params %>%
par_pmap(f, some_other_arg_to_f = "foo", mc.cores = nc)
如果您在 Windows(或任何其他 OS),您可以:
library(parallel)
# (Parallel SOCKet cluster)
cl <- makeCluster(detectCores() - 1)
clusterSetRNGStream(cl, 1)
params %>%
lift(clusterMap, cl = cl)(fun = rnorm)
# [[1]]
# [1] 5.460811
#
# [[2]]
# [1] 7.573021 6.870994 5.633097
#
# [[3]]
# [1] -21.595569 -21.253025 -12.949904 -4.817278 -7.650049
stopCluster(cl)
如果您更倾向于使用 foreach
,您可以:
library(doParallel)
# (fork by default on my Linux machine, should PSOCK by default on Windows)
registerDoParallel(cores = detectCores() - 1)
set.seed(1, "L'Ecuyer")
lift(foreach)(params) %dopar%
rnorm(n, mean, sd)
# [[1]]
# [1] 4.514604
#
# [[2]]
# [1] 0.7022156 0.8734875 5.0250478
#
# [[3]]
# [1] 8.7704060 11.7217925 -12.8776289 -10.7466152 0.5177089
stopImplicitCluster()
我有以下代码使用 purr::pmap
进行串行处理
library(tidyverse)
set.seed(1)
params <- tribble(
~mean, ~sd, ~n,
5, 1, 1,
10, 5, 3,
-3, 10, 5
)
params %>%
pmap(rnorm)
#> [[1]]
#> [1] 4.373546
#>
#> [[2]]
#> [1] 10.918217 5.821857 17.976404
#>
#> [[3]]
#> [1] 0.2950777 -11.2046838 1.8742905 4.3832471 2.7578135
如何并行化(分叉)上述过程,使其运行得更快并产生相同的结果?
在这里,我使用 rnorm
进行说明,实际上我有一个功能可以完成繁重的工作。它需要并行化。
我对非 purrr(非 tidyverse)解决方案持开放态度,只要它在给定 rnorm
函数和 params
作为输入的情况下产生相同的结果。
简而言之:"parallel pmap()
",允许与 pmap()
类似的语法,可能看起来像:lift(mcmapply)()
或 lift(clusterMap)()
.
如果您不在 Windows,您可以:
library(parallel)
# forking
set.seed(1, "L'Ecuyer")
params %>%
lift(mcmapply, mc.cores = detectCores() - 1)(FUN = rnorm)
# [[1]]
# [1] 4.514604
#
# [[2]]
# [1] 0.7022156 0.8734875 5.0250478
#
# [[3]]
# [1] 8.7704060 11.7217925 -12.8776289 -10.7466152 0.5177089
编辑
这里有一个 "cleaner" 选项,感觉应该更像使用 pmap
:
nc <- max(parallel::detectCores() - 1, 1L)
par_pmap <- function(.l, .f, ..., mc.cores = getOption("mc.cores", 2L)) {
do.call(
parallel::mcmapply,
c(.l, list(FUN = .f, MoreArgs = list(...), SIMPLIFY = FALSE, mc.cores = mc.cores))
)
}
f <- function(n, mean, sd, ...) rnorm(n, mean, sd)
params %>%
par_pmap(f, some_other_arg_to_f = "foo", mc.cores = nc)
如果您在 Windows(或任何其他 OS),您可以:
library(parallel)
# (Parallel SOCKet cluster)
cl <- makeCluster(detectCores() - 1)
clusterSetRNGStream(cl, 1)
params %>%
lift(clusterMap, cl = cl)(fun = rnorm)
# [[1]]
# [1] 5.460811
#
# [[2]]
# [1] 7.573021 6.870994 5.633097
#
# [[3]]
# [1] -21.595569 -21.253025 -12.949904 -4.817278 -7.650049
stopCluster(cl)
如果您更倾向于使用 foreach
,您可以:
library(doParallel)
# (fork by default on my Linux machine, should PSOCK by default on Windows)
registerDoParallel(cores = detectCores() - 1)
set.seed(1, "L'Ecuyer")
lift(foreach)(params) %dopar%
rnorm(n, mean, sd)
# [[1]]
# [1] 4.514604
#
# [[2]]
# [1] 0.7022156 0.8734875 5.0250478
#
# [[3]]
# [1] 8.7704060 11.7217925 -12.8776289 -10.7466152 0.5177089
stopImplicitCluster()