为什么在 运行 并行时 "foreach" 不导出到我的可变对象?

Why does "foreach" not export to my mutable object when running in parallel?

我正在 R 中建立一个患者级别的模拟模型。它需要随着时间的推移(使用两个内部循环)为每个患者(有和没有治疗)生成两个数据帧。然后我需要为模型中所需的每个患者循环内部循环。然后将内部循环的结果存储在全局环境中的列表中。

为了尝试加快进程,我想 运行 使用 foreach 包并行处理外部循环。使用 %do% 时循环按预期工作(不是 运行 并行循环)。但是,一旦我将它并行设置为 %dopar% 到 运行,内部循环就不再导出到全局环境中的列表,我收到错误消息:

Error in { : task 1 failed - "object 'Patient_Data' not found"

我在下面提供了代码,其中包含我的外循环函数的 %do%%dopar% 版本的工作示例。内部循环已从示例中删除,仅替换为简单的概率抽取。

如有任何帮助,我们将不胜感激。


library(tidyverse)
library(foreach)
library(doSNOW)

# Input
rm(list = ls())
Patient_Number <- 1000

#### Create a place to store patient data generated during the simulation ####

Patient_Data <- vector("list", length = Patient_Number)


#### Function - Non-parallel ####

Run_Sim <- function(){

  cl <- makeCluster(4, type = "SOCK")
  registerDoSNOW(cl)

  # record the time the model started

  model_start <- Sys.time()

  print(noquote(paste("Time model started: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  #### Simulate Patient's BCVA scores ####

  # create progress bar

  print(noquote("Simulating Patients:"))

  pb <- txtProgressBar(min = 0, max = Patient_Number, style = 3)
  progress <- function(n) setTxtProgressBar(pb, n)
  opts <- list(progress = progress)

  foreach(i = 1:Patient_Number, .packages = c("tidyverse"), .inorder = FALSE,
          .export = ls(globalenv()),
          .options.snow = opts) %do% {

            This_Patient <- list(
              Patient_ID = 0,
              Intervention = 0,
              Comparator = 0
            )

            This_Patient_Draw_Int <- rnorm(1, mean = 50, sd = 7.8) # These normally would be more complex functions generating a data frame for each patient
            This_Patient_Draw_Comp <- rnorm(1, mean = 44, sd = 10) # These normally would be more complex functions generating a data frame for each patient

            This_Patient$Patient_ID <- i
            This_Patient$Intervention <- This_Patient_Draw_Int
            This_Patient$Comparator <- This_Patient_Draw_Comp

            Patient_Data[[i]] <<- This_Patient

          }

  # stop the progress bar

  close(pb)

  # record when model finished

  model_finish <- Sys.time()
  print(noquote(paste("Time model finished: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  print(noquote(paste("Model took ", round(difftime(model_finish, model_start, units = c("mins")), 0),
                      " minute(s) to simulate ", Patient_Number, " Patients", sep = "")))

  stopCluster(cl)

}

Run_Sim()


#### Parallel version using foreach %dopar% ####

rm(list = ls())
Patient_Number <- 1000
Patient_Data <- vector("list", length = Patient_Number)

Run_Sim_Para <- function(){

  cl <- makeCluster(4, type = "SOCK")
  registerDoSNOW(cl)

  # record the time the model started

  model_start <- Sys.time()

  print(noquote(paste("Time model started: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  #### Simulate Patient's BCVA scores ####

  # create progress bar

  print(noquote("Simulating Patients:"))

  pb <- txtProgressBar(min = 0, max = Patient_Number, style = 3)
  progress <- function(n) setTxtProgressBar(pb, n)
  opts <- list(progress = progress)

  foreach(i = 1:Patient_Number, .packages = c("tidyverse"), .inorder = FALSE,
          .export = ls(globalenv()),
          .options.snow = opts) %dopar% {

            This_Patient <- list(
              Patient_ID = 0,
              Intervention = 0,
              Comparator = 0
            )

            This_Patient_Draw_Int <- rnorm(1, mean = 50, sd = 7.8) # These normally would be more complex functions generating a data frame for each patient
            This_Patient_Draw_Comp <- rnorm(1, mean = 44, sd = 10) # These normally would be more complex functions generating a data frame for each patient

            This_Patient$Patient_ID <- i
            This_Patient$Intervention <- This_Patient_Draw_Int
            This_Patient$Comparator <- This_Patient_Draw_Comp

            Patient_Data[[i]] <<- This_Patient

          }

  # stop the progress bar

  close(pb)

  # record when model finished

  model_finish <- Sys.time()
  print(noquote(paste("Time model finished: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  print(noquote(paste("Model took ", round(difftime(model_finish, model_start, units = c("mins")), 0),
                      " minute(s) to simulate ", Patient_Number, " Patients", sep = "")))

  stopCluster(cl)

}

Run_Sim_Para()

我已经通过以下操作解决了这个问题;

  1. 创建一个将内部循环编译成列表的单独函数
  2. 然后将此列表函数传递给 foreach 函数
  3. assign 函数不是使用可变状态来更新全局环境中已经存在的列表,而是用于将 foreach 循环的输出传递给名为 [=26= 的对象] 在全球环境中

示例代码如下。希望这可以帮助其他可能遇到类似问题的人。

library(tidyverse)
library(foreach)
library(doSNOW)

# Input
rm(list = ls())
Patient_Number <- 1e4

#### Create a listing function which will be ran through "foreach" ####

list_func <- function(Patient_ID_Code){

  This_Patient <- list(
    Patient_ID = 0,
    Intervention = 0,
    Comparator = 0
  )

  This_Patient_Draw_Int <- rnorm(1, mean = 50, sd = 7.8) # These normally would be more complex functions generating a data frame for each patient
  This_Patient_Draw_Comp <- rnorm(1, mean = 44, sd = 10) # These normally would be more complex functions generating a data frame for each patient

  This_Patient$Patient_ID <- Patient_ID_Code
  This_Patient$Intervention <- This_Patient_Draw_Int
  This_Patient$Comparator <- This_Patient_Draw_Comp

  return(This_Patient)


}


Run_Sim_Para <- function(){

  cl <- parallel::makeCluster(parallel::detectCores() - 1)
  registerDoSNOW(cl)

  # record the time the model started

  model_start <- Sys.time()

  print(noquote(paste("Time model started: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  #### Simulate Patient's BCVA scores ####

  # create progress bar

  print(noquote("Simulating Patients:"))

  pb <- txtProgressBar(min = 0, max = Patient_Number, style = 3)
  progress <- function(n) setTxtProgressBar(pb, n)
  opts <- list(progress = progress)

  test <- foreach(i = 1:Patient_Number, .packages = c("tidyverse"),
                  .export = ls(.GlobalEnv),
                  .options.snow = opts) %dopar% {

                    list_func(i)

                  }

  # stop the progress bar

  close(pb)

  # record when model finished

  model_finish <- Sys.time()
  print(noquote(paste("Time model finished: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  print(noquote(paste("Model took ", round(difftime(model_finish, model_start, units = c("mins")), 0),
                      " minute(s) to simulate ", Patient_Number, " Patients", sep = "")))

  stopCluster(cl)

  assign("Patient_Data", test, envir = .GlobalEnv)

}

Run_Sim_Para()