在 R 中执行未来包会导致无休止的等待时间

Execution of future package in R results in an endless waiting time

我对未来的包裹有疑问。在我的任务中,我尝试设置一个异步进程。我正在使用期货来做到这一点。如果我 运行 我的脚本是第一次(在干净的 RSession 中),一切都按预期正常工作。 运行 在同一个 R Session 中第二次执行相同的函数,最终陷入无尽的等待。执行在 Futures 开始的那一行停止。没有错误被抛出。该代码永远 运行s。如果我手动中断代码,则会从以下行调用浏览器: Sys.sleep(间隔)。 稍早一点执行此操作,调用来自: 调用自:socketSelect(list(con), write = FALSE, timeout = timeout).

我写了一个小程序,和我的脚本结构基本一样,也出现了同样的问题。虽然在这个小例子中并不明显,但这种结构在我的原始代码中有一些优势:

 library(future)
library(parallel)

asynchronousfunction <- function(){

 Threads.2.start <- availableCores()
 cl <- parallel::makePSOCKcluster(Threads.2.start)
 plan(cluster, workers = cl)

 threads <- lapply(1:Threads.2.start, function(index){
   future::cluster({Sys.getpid()},persistent = TRUE, workers = cl[[index]])
 })

 while(!any(resolved(threads))){

    Sys.sleep(0.1)

 }

 threads <- lapply(1:Threads.2.start, function(index){
   future::cluster({Sys.getpid()},persistent = TRUE, workers = cl[[index]])
 })

 stopCluster(cl = cl)

}


asynchronousfunction() # First call to the function. Everything is working fine.
asynchronousfunction() #Second call to the function. Endless Execution.

我正在研究 Windows 10,R 版本是 3.4.2。软件包版本为 1.6.2.

希望大家帮帮我。

提前致谢。

此致,

哈佛

作者 future 在这里。看起来你有点过头了,我不是 100% 确定你想要达到的目标。我觉得可疑的是你使用了:

  • cluster() - 改为调用 future()
  • cluster(..., workers = cl[[index]]) - 设置 future 时不要指定 workers
  • 您想使用 persistent = TRUE 的原因吗?
  • resolve(threads) 基本上和你的 while() 循环一样。
  • 您没有收集期货的价值,即您没有调用 value()values()
  • 对于故障排除,您可以通过设置 option(future.debug = TRUE).
  • 获取有关幕后情况的更多详细信息

如果我将您的示例重写为接近您现在的示例,则工作示例如下所示:

library("future")

asynchronousfunction <- function() {
  n <- availableCores()
  cl <- makeClusterPSOCK(n)
  plan(cluster, workers = cl)

  fs <- lapply(1:n, function(index) {
    future({ Sys.getpid() }, persistent = TRUE)
  })

  ## Can be replaced by resolve(fs)
  while(!any(resolved(fs))) {
    Sys.sleep(0.1)
  }

  fs <- lapply(1:n, function(index) {
    future({ Sys.getpid() }, persistent = TRUE)
  })

  parallel::stopCluster(cl = cl)
}

不用自己滚动 lapply() + future(),使用 future_lapply() 就足够了吗?例如,

asynchronousfunction <- function() {
  n <- availableCores()
  cl <- makeClusterPSOCK(n)
  plan(cluster, workers = cl)

  pids <- future_lapply(1:n, function(ii) {
    Sys.getpid()
  })
  str(pids)

  pids <- future_lapply(1:n, function(ii) {
    Sys.getpid()
  })
  str(pids)

  parallel::stopCluster(cl = cl)
}