为什么我的 second-level 期货连续执行?

Why do my second-level futures execute serially?

我正在尝试复制 topology vignette of the future package 中的示例。引用:

Futures can be nested in R such that one future creates another set of futures and so on. This may, for instance, occur within nested for loops [...]

有一部分作者使用plan(list(multicore, multicore))(进一步的参数和tweak省略)同步处理两个期货,每个期货同步处理四个期货。这应该等于同步处理的八个期货。

但是,当我尝试用下面的代码重现它时,我看到第二级期货是按顺序处理的。我做错了什么?

MCVE

library(future)
library(ggplot2)
plan(list(multiprocess, multiprocess))


# Run for a random amount of time and return start and stop time
startStop <- function(){
  start <- Sys.time()
  x <- runif(1, 1, 3)
  Sys.sleep(x)
  stop <- Sys.time()
  return(data.frame(start = start, stop = stop))
}

nGrp <- 3
nCV <- 4

l <- rep(list(NULL), nGrp)


for(i in seq_along(l)){
  l[[i]] <- future({
    m <- rep(list(NULL), nCV)
    for(j in seq_along(m)){
      m[[j]] <- future(startStop())
    }
    m <- lapply(m, value)
    m <- do.call(rbind, m)
    m
  })
}
l <- lapply(l, value)
d <- do.call(rbind, l)
d$iGrp <- rep(seq_len(nGrp), each = nCV)
d$iCV <- rep(seq_len(nCV), times = nGrp)

d$x <- paste(d$iGrp, d$iCV, sep = "_")
d$iGrp <- as.character(d$iGrp)
ggplot(d, aes(x = x, ymin = start, ymax = stop, color = iGrp)) + geom_linerange() + coord_flip()

预期

Session 信息

R version 3.4.3 (2017-11-30)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

Matrix products: default
BLAS: /opt/Bio/R/3.4.3/lib64/R/lib/libRblas.so
LAPACK: /opt/Bio/R/3.4.3/lib64/R/lib/libRlapack.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] ggplot2_2.2.1 future_1.8.1 

loaded via a namespace (and not attached):
 [1] Rcpp_0.12.17     devtools_1.13.4  munsell_0.4.3    colorspace_1.3-2
 [5] R6_2.2.2         rlang_0.1.6      httr_1.3.1       plyr_1.8.4      
 [9] globals_0.11.0   tools_3.4.3      parallel_3.4.3   grid_3.4.3      
[13] gtable_0.2.0     git2r_0.21.0     withr_2.1.1      yaml_2.1.16     
[17] lazyeval_0.2.1   digest_0.6.15    tibble_1.4.2     codetools_0.2-15
[21] curl_3.1         memoise_1.1.0    compiler_3.4.3   pillar_1.1.0    
[25] scales_0.5.0     listenv_0.7.0 

此处 future 的作者:这是因为存在针对嵌套并行性的内置保护。没有它,您的计算机将因过多的并行进程而超载,这不仅会使计算机过热,还会降低整体性能。

我已经用以下部分更新了下一个版本的 'Future Topologies' 小插图:

Built-in protection against recursive parallelism

Above we have processed either the outer or the inner set of future in parallel. What if we want to process both layers in parallel? It's tempting to use:

plan(list(multiprocess, multiprocess))

Although this does not give an error, we will find that the inner layer of futures will be processed sequentially just as if we would use plan(list(multiprocess, sequential)). This behavior is due to the built-in protection against nested parallelism. If both layers would run in parallel, each using the 8 cores available on the machine, we would be running 8 * 8 = 64 parallel processes - that would for sure overload our computer. What happens internally is that for the outer layer, availableCores() equals eight (8), whereas for the inner layer it equals one (1).

Now, we could imagine that we process the outer layer with, say, two parallel futures, and then the inner layer with four parallel futures. In that case, we would end up running on at most eight cores (= 2 * 4). This can be achieved by forcing a fixed number of workers at each layer (not recommended):

plan(list(tweak(multiprocess, workers = 2), tweak(multiprocess, workers = 4)))

如果你想像你期望的那样实现并行处理,future.callr是选择。 只需使用: library(future.callr) plan(list(callr, callr))