在每个计算中并行使用标准 R 闪亮进度条

Using standard R shiny progress bar in parallel foreach calculations

我正在尝试使用 doParallel 后端在并行 foreach 循环中使用标准 R 闪亮进度条。但是,这会导致以下错误消息:

Warning: Error in {: task 1 failed - "'session' is not a ShinySession object."

代码(最小工作示例)

library(shiny)
library(doParallel)

ui <- fluidPage(
  actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

  workers=makeCluster(2)
  registerDoParallel(workers)

  observeEvent(input$go, {
    Runs=c(1:4)
    Test_out=foreach(i=Runs, .combine=cbind, .inorder=TRUE, .packages=c("shiny"),.export=c("session")) %dopar% { 
      pbShiny = shiny::Progress$new()
      pbShiny <- Progress$new(session,min = 0, max = 10)
      on.exit(pbShiny$close())
      test_vec=rep(0,100)

      for(i in 1:10){
        test_vec=test_vec+rnorm(100)
        pbShiny$set(message="Simulating",detail=paste(i),
                  value=i)
        Sys.sleep(0.2)
      }

    }
  })
}

shinyApp(ui = ui, server = server)

代码 运行 如果我按顺序 运行 foreach 循环(使用 registerDoSEQ())。 有谁知道如何解决这个问题?


总体目标


下面link下有一个类似的问题,但由于没有提供工作示例而没有得到解决:

Utilizing parallel foreach for progress bar in R Shiny

doParallel 包是并行包的扩展,如此处的文档所示。

https://cran.r-project.org/web/packages/doParallel/doParallel.pdf

阅读并行包的文档,我们看到它实现了 3 种不同的方法来实现并行性。请记住 R 是一种单线程语言。

  1. 父进程与工作进程或子进程通信的新 R 会话。
  2. 通过分叉
  3. 使用OS级设施

您可以在此处找到此信息,

https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf

这样做的结果是子进程在完成计算和 returns 一个值之前无法与父进程通信。这是据我所知。

因此,无法在工作进程中勾选进度条。

完全公开,我没有使用过 doParallel 包,关于 shiny 的文档也很有限。


备选方案

有一个类似的包,但是有大量关于 shiny 的文档。这些是 futurespromisesipc 包。 futurespromises 启用异步编程,而 ipc 启用进程间通信。为了进一步帮助我们,它还有一个 AsyncProgress() 功能。

这是一个我们同步勾选两个计数器的例子。

例子

library(shiny)
library(future)
library(promises)
library(ipc)

plan(multisession)


ui <- fluidPage(
  actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

  observeEvent(input$go, {

    progress = AsyncProgress$new(message="Complex analysis")

    future({
      for (i in 1:15) {
        progress$inc(1/15)
        Sys.sleep(0.5)
      }

      progress$close()
      return(i)
    })%...>%
      cat(.,"\n")

    Sys.sleep(1)

    progress2 = AsyncProgress$new(message="Complex analysis")

    future({
      for (i in 1:5) {
        progress2$inc(1/5)
        Sys.sleep(0.5)
      }

      progress2$close()

      return(i)
    })%...>%
      cat(.,"\n")

    NULL
  })
}

shinyApp(ui = ui, server = server)

您的代码已修改

这是您编写的代码,稍作修改以分离出许多异步进程。任何工作都可以在 worker 中执行,例如您创建的向量并添加一个 rnorm。 (此处未显示)

library(shiny)
library(future)
library(promises)
library(ipc)

plan(multisession)

ui <- fluidPage(
  actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

  observeEvent(input$go, {
    Runs=c(1:4) #define the number of runs
    progress = list() #A list to maintain progress for each run

    for(j in Runs){
      progress[[j]] = AsyncProgress$new(message="Complex analysis")
      future({
        for (i in 1:10) {
          progress[[j]]$inc(1/10)
          Sys.sleep(0.2)
        }
        progress[[j]]$close()
        return(i)
    })%...>%
        cat(.,'\n')
    }

    NULL
  })
}

shinyApp(ui = ui, server = server)

上面的代码是在此处的 ipc 文档中找到的代码的修改版本:

http://htmlpreview.github.io/?https://github.com/fellstat/ipc/blob/master/inst/doc/shinymp.html

其他资源:

https://rstudio.github.io/promises/articles/overview.html

我想我找到了解决运行次数超过核心数的情况的方法。

我搜索了嵌套的未来流程并找到了以下页面:

https://cran.r-project.org/web/packages/future/vignettes/future-3-topologies.html

我更改了我的代码如下。这会按内核顺序运行作业并相应地更新相应的进度条。

library(shiny)
library(future)
library(promises)
library(ipc)
library(listenv)

plan(list(multiprocess, sequential))

ui <- fluidPage(
  actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

  observeEvent(input$go, {
    x <- listenv()
    Runs=12 #define the number of runs
    N=availableCores()
    Tasks=rep(0,N) #Number of sequential tasks per core
    Tasks[1:(Runs-(ceiling(Runs/N)-1)*N)]=ceiling(Runs/N)
    if((Runs-(ceiling(Runs/N)-1)*N)<N){
      Tasks[(Runs-(ceiling(Runs/N)-1)*N+1):N]=ceiling(Runs/N)-1
    }

    progress = list() #A list to maintain progress for each run

    for(j in 1:N){

      for(l in 1:Tasks[j]){
        progress[[(l-1)*N+j]] = AsyncProgress$new(message=paste("Complex analysis, core ",j," , task ",l))
      }

    x[[j]]%<-%{
      for(l in 1:Tasks[j]){
        for (i in 1:10) {
          progress[[(l-1)*N+j]]$inc(1/10)
          Sys.sleep(0.5)
        }
        progress[[(l-1)*N+j]]$close()
      }
    }
    }

    NULL
  })
}

shinyApp(ui = ui, server = server)