使用期货时,R-Shiny App 无法正确更新进度条

R-Shiny App does not update progress bar correctly when using futures

我有一个闪亮的应用程序,它应该 运行 在多核上进行计算,同时通过进度条提供反馈。只要我不进一步处理期货的结果,这就可以正常工作(请参见下面的工作示例)。一旦我之后使用结果,进度条不会更新,直到所有期货都完成。

我使用包 futurepromisesipc 进行进程间通信。我认为问题是 R 想要在结果一出来就继续使用 futures。我试图用 resolved() 或 resolve() 之类的命令停止算法,但没有任何进展。

library(shiny)
library(future)
library(promises)
library(ipc)

plan(list(multiprocess, sequential))

ui <- fluidPage(
    actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

    observeEvent(input$go, {
        x <- list()
        N = availableCores()
        Tasks = rep(10, N) #Number of sequential tasks per core

        progress = list() #A list to maintain progress for each run

        resultsvec <- c()
        for(j in 1:N){

            progress[[j]] = AsyncProgress$new(message = paste("analysis, core ",j))

            x[[j]] <- future({
                for(l in 1:Tasks[j]){
                    progress[[j]]$inc(1/Tasks[j])
                    resultsvec <- append(resultsvec, l)
                    Sys.sleep(1)
                }
                resultsvec
                progress[[j]]$close()
            })
        }
        result <- lapply(x, value)
        #... do stuff with result
    })
}

shinyApp(ui = ui, server = server)

这是正确更新进度条的服务器函数。

server <- function(input, output, session) {

    observeEvent(input$go, {
        x <- list()
        N = availableCores()
        Tasks = rep(10, N) #Number of sequential tasks per core
        progress = list() #A list to maintain progress for each run

        for(j in 1:N){

            progress[[j]] = AsyncProgress$new(message = paste("analysis, core ",j))

            x[[j]] <- future({
                for(l in 1:Tasks[j]){
                    progress[[j]]$inc(1/Tasks[j])
                    Sys.sleep(1)
                }
                progress[[j]]$close()
            })
        }
    })
}

我设法解决了我需要的问题,尽管该解决方案不再使用期货。我切换到 doSNOW 包。但据我所知,除了允许进程间通信的 future/promises 之外,doSNOW 或其他并行包中没有其他选项。所以这是我的解决方法。与上面相反,我在整个过程中使用了一个进度条。

library(shiny)
library(doSNOW)

ui <- fluidPage(
    actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

    observeEvent(input$go, {

        Tasks <- 40 #now total tasks to do
        runs <- 10 #splitting of progress bar. 10 means every 10% it gets updated. 20 every 5% etc.

        taskvec <- rep(Tasks %/% runs, runs)

        if (Tasks %% runs != 0){
            taskvec[1:(Tasks %% runs)] <- taskvec[1:(Tasks %% runs)] + 1
        }

        resultsvec <- c()

        cl <- makeCluster(2)
        registerDoSNOW(cl)

        withProgress(message = "Analysis", value = 0,{
            for (j in 1:runs) {

                resultsvec_sub <- foreach(i = 1:taskvec[j], 
                                          .combine = append) %dopar% {
                                              f <- i
                                              Sys.sleep(1)
                                              return(f)
                                          }
                resultsvec <- append(resultsvec, resultsvec_sub)
                incProgress(1/runs)
            }
        })
        stopCluster(cl)
        #do stuff with resultsvec..
    })
}

shinyApp(ui = ui, server = server)

如您所见,我在将任务分配给核心之前拆分了任务,并在所有核心上完成每次拆分后更新进度条。这当然效率更低,因为当拆分中的几乎所有任务都已完成时。一些核心可能处于空闲状态,直到其他核心完成并开始下一次拆分。 可以改进任务的拆分 process/distribution,但它现在正在工作。