使用期货时,R-Shiny App 无法正确更新进度条
R-Shiny App does not update progress bar correctly when using futures
我有一个闪亮的应用程序,它应该 运行 在多核上进行计算,同时通过进度条提供反馈。只要我不进一步处理期货的结果,这就可以正常工作(请参见下面的工作示例)。一旦我之后使用结果,进度条不会更新,直到所有期货都完成。
我使用包 future
、promises
和 ipc
进行进程间通信。我认为问题是 R 想要在结果一出来就继续使用 futures。我试图用 resolved() 或 resolve() 之类的命令停止算法,但没有任何进展。
library(shiny)
library(future)
library(promises)
library(ipc)
plan(list(multiprocess, sequential))
ui <- fluidPage(
actionButton(inputId = "go", label = "Launch calculation")
)
server <- function(input, output, session) {
observeEvent(input$go, {
x <- list()
N = availableCores()
Tasks = rep(10, N) #Number of sequential tasks per core
progress = list() #A list to maintain progress for each run
resultsvec <- c()
for(j in 1:N){
progress[[j]] = AsyncProgress$new(message = paste("analysis, core ",j))
x[[j]] <- future({
for(l in 1:Tasks[j]){
progress[[j]]$inc(1/Tasks[j])
resultsvec <- append(resultsvec, l)
Sys.sleep(1)
}
resultsvec
progress[[j]]$close()
})
}
result <- lapply(x, value)
#... do stuff with result
})
}
shinyApp(ui = ui, server = server)
这是正确更新进度条的服务器函数。
server <- function(input, output, session) {
observeEvent(input$go, {
x <- list()
N = availableCores()
Tasks = rep(10, N) #Number of sequential tasks per core
progress = list() #A list to maintain progress for each run
for(j in 1:N){
progress[[j]] = AsyncProgress$new(message = paste("analysis, core ",j))
x[[j]] <- future({
for(l in 1:Tasks[j]){
progress[[j]]$inc(1/Tasks[j])
Sys.sleep(1)
}
progress[[j]]$close()
})
}
})
}
我设法解决了我需要的问题,尽管该解决方案不再使用期货。我切换到 doSNOW
包。但据我所知,除了允许进程间通信的 future/promises
之外,doSNOW
或其他并行包中没有其他选项。所以这是我的解决方法。与上面相反,我在整个过程中使用了一个进度条。
library(shiny)
library(doSNOW)
ui <- fluidPage(
actionButton(inputId = "go", label = "Launch calculation")
)
server <- function(input, output, session) {
observeEvent(input$go, {
Tasks <- 40 #now total tasks to do
runs <- 10 #splitting of progress bar. 10 means every 10% it gets updated. 20 every 5% etc.
taskvec <- rep(Tasks %/% runs, runs)
if (Tasks %% runs != 0){
taskvec[1:(Tasks %% runs)] <- taskvec[1:(Tasks %% runs)] + 1
}
resultsvec <- c()
cl <- makeCluster(2)
registerDoSNOW(cl)
withProgress(message = "Analysis", value = 0,{
for (j in 1:runs) {
resultsvec_sub <- foreach(i = 1:taskvec[j],
.combine = append) %dopar% {
f <- i
Sys.sleep(1)
return(f)
}
resultsvec <- append(resultsvec, resultsvec_sub)
incProgress(1/runs)
}
})
stopCluster(cl)
#do stuff with resultsvec..
})
}
shinyApp(ui = ui, server = server)
如您所见,我在将任务分配给核心之前拆分了任务,并在所有核心上完成每次拆分后更新进度条。这当然效率更低,因为当拆分中的几乎所有任务都已完成时。一些核心可能处于空闲状态,直到其他核心完成并开始下一次拆分。
可以改进任务的拆分 process/distribution,但它现在正在工作。
我有一个闪亮的应用程序,它应该 运行 在多核上进行计算,同时通过进度条提供反馈。只要我不进一步处理期货的结果,这就可以正常工作(请参见下面的工作示例)。一旦我之后使用结果,进度条不会更新,直到所有期货都完成。
我使用包 future
、promises
和 ipc
进行进程间通信。我认为问题是 R 想要在结果一出来就继续使用 futures。我试图用 resolved() 或 resolve() 之类的命令停止算法,但没有任何进展。
library(shiny)
library(future)
library(promises)
library(ipc)
plan(list(multiprocess, sequential))
ui <- fluidPage(
actionButton(inputId = "go", label = "Launch calculation")
)
server <- function(input, output, session) {
observeEvent(input$go, {
x <- list()
N = availableCores()
Tasks = rep(10, N) #Number of sequential tasks per core
progress = list() #A list to maintain progress for each run
resultsvec <- c()
for(j in 1:N){
progress[[j]] = AsyncProgress$new(message = paste("analysis, core ",j))
x[[j]] <- future({
for(l in 1:Tasks[j]){
progress[[j]]$inc(1/Tasks[j])
resultsvec <- append(resultsvec, l)
Sys.sleep(1)
}
resultsvec
progress[[j]]$close()
})
}
result <- lapply(x, value)
#... do stuff with result
})
}
shinyApp(ui = ui, server = server)
这是正确更新进度条的服务器函数。
server <- function(input, output, session) {
observeEvent(input$go, {
x <- list()
N = availableCores()
Tasks = rep(10, N) #Number of sequential tasks per core
progress = list() #A list to maintain progress for each run
for(j in 1:N){
progress[[j]] = AsyncProgress$new(message = paste("analysis, core ",j))
x[[j]] <- future({
for(l in 1:Tasks[j]){
progress[[j]]$inc(1/Tasks[j])
Sys.sleep(1)
}
progress[[j]]$close()
})
}
})
}
我设法解决了我需要的问题,尽管该解决方案不再使用期货。我切换到 doSNOW
包。但据我所知,除了允许进程间通信的 future/promises
之外,doSNOW
或其他并行包中没有其他选项。所以这是我的解决方法。与上面相反,我在整个过程中使用了一个进度条。
library(shiny)
library(doSNOW)
ui <- fluidPage(
actionButton(inputId = "go", label = "Launch calculation")
)
server <- function(input, output, session) {
observeEvent(input$go, {
Tasks <- 40 #now total tasks to do
runs <- 10 #splitting of progress bar. 10 means every 10% it gets updated. 20 every 5% etc.
taskvec <- rep(Tasks %/% runs, runs)
if (Tasks %% runs != 0){
taskvec[1:(Tasks %% runs)] <- taskvec[1:(Tasks %% runs)] + 1
}
resultsvec <- c()
cl <- makeCluster(2)
registerDoSNOW(cl)
withProgress(message = "Analysis", value = 0,{
for (j in 1:runs) {
resultsvec_sub <- foreach(i = 1:taskvec[j],
.combine = append) %dopar% {
f <- i
Sys.sleep(1)
return(f)
}
resultsvec <- append(resultsvec, resultsvec_sub)
incProgress(1/runs)
}
})
stopCluster(cl)
#do stuff with resultsvec..
})
}
shinyApp(ui = ui, server = server)
如您所见,我在将任务分配给核心之前拆分了任务,并在所有核心上完成每次拆分后更新进度条。这当然效率更低,因为当拆分中的几乎所有任务都已完成时。一些核心可能处于空闲状态,直到其他核心完成并开始下一次拆分。 可以改进任务的拆分 process/distribution,但它现在正在工作。