R中的并行foreach共享内存
Shared memory in parallel foreach in R
问题描述:
我有一个大矩阵 c
,已加载到 RAM 内存中。我的目标是通过并行处理对其进行只读访问。然而,当我创建连接时,我使用 doSNOW
、doMPI
、big.matrix
等,使用的 ram 数量急剧增加。
有没有办法正确创建共享内存,所有进程都可以从中读取,而不创建所有数据的本地副本?
示例:
libs<-function(libraries){# Installs missing libraries and then load them
for (lib in libraries){
if( !is.element(lib, .packages(all.available = TRUE)) ) {
install.packages(lib)
}
library(lib,character.only = TRUE)
}
}
libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)
#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% {
#load bigmemory
require(bigmemory)
# attach the matrix via shared memory??
m <- attach.big.matrix(mdesc)
#dummy expression to test data aquisition
c<-m[1,1]
}
closeAllConnections()
内存:
在上图中,您可能会发现内存增加很多,直到 foreach
结束并被释放。
我想问题的解决办法可以从foreach
包here的作者Steve Weston的post那里看到。他在那里说:
The doParallel package will auto-export variables to the workers that are referenced in the foreach loop.
所以我认为问题是在您的代码中,您的大矩阵 c
在作业 c<-m[1,1]
中被引用。只需尝试 xyz <- m[1,1]
,看看会发生什么。
这是一个文件支持的例子 big.matrix
:
#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double",
separated = FALSE,
backingfile = "example.bin",
descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
invisible(c) ## c is referenced and thus exported to workers
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
closeAllConnections()
或者,如果您在 Linux/Mac 上并且想要 CoW 共享内存,请使用 fork。首先将所有数据加载到主线程中,然后使用 parallel
包中的通用函数 mcparallel
启动工作线程(分支)。
您可以使用 mccollect
或使用 Rdsm
库使用真正的共享内存来收集他们的结果,如下所示:
library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number
job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23
您可以确认,如果延迟写入,该值确实会在后台更新:
fn<-function()
{
Sys.sleep(1) #One second delay
shared[1]<-11
}
job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)
要控制并发并避免竞争条件,请使用锁:
library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"
bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
}
good.incr<-function()
{
lock(m)
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
unlock(m)
}
shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions
mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6
mccollect()
编辑:
我通过将 Rdsm::mgrmakevar
交换为 bigmemory::big.matrix
来简化依赖关系。 mgrmakevar
内部调用 big.matrix
,我们不需要更多。
问题描述:
我有一个大矩阵 c
,已加载到 RAM 内存中。我的目标是通过并行处理对其进行只读访问。然而,当我创建连接时,我使用 doSNOW
、doMPI
、big.matrix
等,使用的 ram 数量急剧增加。
有没有办法正确创建共享内存,所有进程都可以从中读取,而不创建所有数据的本地副本?
示例:
libs<-function(libraries){# Installs missing libraries and then load them
for (lib in libraries){
if( !is.element(lib, .packages(all.available = TRUE)) ) {
install.packages(lib)
}
library(lib,character.only = TRUE)
}
}
libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)
#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% {
#load bigmemory
require(bigmemory)
# attach the matrix via shared memory??
m <- attach.big.matrix(mdesc)
#dummy expression to test data aquisition
c<-m[1,1]
}
closeAllConnections()
内存:
foreach
结束并被释放。
我想问题的解决办法可以从foreach
包here的作者Steve Weston的post那里看到。他在那里说:
The doParallel package will auto-export variables to the workers that are referenced in the foreach loop.
所以我认为问题是在您的代码中,您的大矩阵 c
在作业 c<-m[1,1]
中被引用。只需尝试 xyz <- m[1,1]
,看看会发生什么。
这是一个文件支持的例子 big.matrix
:
#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double",
separated = FALSE,
backingfile = "example.bin",
descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
invisible(c) ## c is referenced and thus exported to workers
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
closeAllConnections()
或者,如果您在 Linux/Mac 上并且想要 CoW 共享内存,请使用 fork。首先将所有数据加载到主线程中,然后使用 parallel
包中的通用函数 mcparallel
启动工作线程(分支)。
您可以使用 mccollect
或使用 Rdsm
库使用真正的共享内存来收集他们的结果,如下所示:
library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number
job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23
您可以确认,如果延迟写入,该值确实会在后台更新:
fn<-function()
{
Sys.sleep(1) #One second delay
shared[1]<-11
}
job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)
要控制并发并避免竞争条件,请使用锁:
library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"
bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
}
good.incr<-function()
{
lock(m)
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
unlock(m)
}
shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions
mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6
mccollect()
编辑:
我通过将 Rdsm::mgrmakevar
交换为 bigmemory::big.matrix
来简化依赖关系。 mgrmakevar
内部调用 big.matrix
,我们不需要更多。