使用 foreach Rcpp 和 RNG 的并行计算比预期慢得多

parallel computing with foreach Rcpp and RNG much slower than expected

我有一个 Rcpp 函数,它输出一个我想保存为 R 对象的大矩阵。我的想法是同时使用我的 Rcpp 函数和 foreach 包来加快速度。

对于相同的矩阵大小,在我的 Windows 机器上使用 foreach 的时间大约是 运行 不使用 foreach 的函数的五倍多(不包括工人的设置)。 我知道有关并行执行非常小的任务的问题(例如 Why is the parallel package slower than just using apply?)。此外,我愿意抛开 运行 并行随机数生成器的理论问题,因为结果可能不再是真正随机的。

由于我的子任务应该足够大,显然我写的Rcpp函数不能很好地并行工作,但我不知道为什么。在 Rcpp 函数中使用 RNG 只是一项无法并行化的任务吗?除此之外:我的子矩阵在 foreach 中是否有一个最优的 i 和一个最优的 ncol(此处 n_bootstrap)?非常感谢任何帮助。如果您愿意,也请随时对代码发表评论。

澄清:我编译了一个包并在foreach

中使用了mypackage::funC

这是 R 中的示例代码:

y <- funC(n_bootstrap = 250, n_obs_censusdata = 300000,
          locationeffects = as.numeric(1:200), 
          residuals = as.numeric(1:20000),
          X = matrix(as.numeric(1:3000000), ncol = 10), 
          beta_sample = matrix(as.numeric(1:2500), ncol = 250))

并行:

no_cores <- parallel::detectCores() - 2
cl <- parallel::makeCluster(no_cores)
doParallel::registerDoParallel(cl)

y <- foreach(i=1:5, .combine = "cbind") %dopar% {

  funC(n_bootstrap = 50,
       n_obs_censusdata = 300000, locationeffects = as.numeric(1:200), 
       residuals = as.numeric(1:20000), 
       X = matrix(as.numeric(1:3000000), ncol = 10), 
       beta_sample = matrix(as.numeric(1:2500), ncol = 250))
                       }
parallel::stopCluster(cl)

添加:使用 bigstatsr

y <- bigstatsr::FBM(nrow = 300000, ncol = 250, type = "double")
bigstatsr::big_apply(y, a.FUN = function(y, ind, fun) {
          y[, ind] <- fun(n_bootstrap = length(ind),
                                    n_obs_censusdata = 300000,
                                    locationeffects = as.numeric(1:200),
                                    residuals = as.numeric(1:20000),
                                    X = matrix(as.numeric(1:3000000), ncol = 10), 
                                    beta_sample =  matrix(as.numeric(1:2500), ncol = 250))
          NULL
        }, a.combine = 'c', ncores = bigstatsr::nb_cores(), fun = funC)+

这是 Rcpp 代码:

// -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-

#include <RcppEigen.h>
#include <random>

using namespace Rcpp;
// [[Rcpp::depends(RcppEigen)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::export]]
SEXP funC(const int n_bootstrap,
          const int n_obs_censusdata, 
          const Eigen::Map<Eigen::VectorXd> locationeffects, 
          const Eigen::Map<Eigen::VectorXd> residuals,
          const Eigen::Map<Eigen::MatrixXd> X, 
          const Eigen::Map<Eigen::MatrixXd> beta_sample)
{

  // --------- create random sample of locations and of residuals --------- //

    // initialise random seeds 
  std::random_device rd; // used to obtain a seed for the number engine
  std::mt19937 gen(rd()); // Mersenne Twister engine 

  // initialize distributions for randam locations and residuals
  const int upperlocation = locationeffects.size();
  const int upperresiduals = residuals.size();

  std::uniform_int_distribution<> distrloc(1, upperlocation);
  std::uniform_int_distribution<> distrres(1, upperresiduals);

  // initialize and fill matrix for randam locations and residuals 
  Eigen::MatrixXd LocationEffectResiduals(n_obs_censusdata, n_bootstrap);

  for (int i=0; i<n_obs_censusdata; ++i)
    for (int j=0; j<n_bootstrap; j++)
      LocationEffectResiduals(i,j) = locationeffects[distrloc(gen)-1] + residuals[distrres(gen)-1]; // subtract 1 because in C++ indices start with 0

  // ----- create Xbeta ------- //
    Eigen::MatrixXd Xbeta = X * beta_sample;

  // ----- combine results ------- //
    Eigen::MatrixXd returnmatrix = Xbeta + LocationEffectResiduals;

  return Rcpp::wrap(returnmatrix);
}

这里你要创建一个大矩阵。原则上可以将其分配给多个进程,但最终要承担合并结果的成本。我建议在这里使用 "shared memory parallelism"。我使用 OpenMP code from here 作为算法并行版本的起点:

// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::depends(dqrng)]]
#include <xoshiro.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>
// [[Rcpp::plugins(cpp11)]]
#include <random>

// [[Rcpp::export]]
Eigen::MatrixXd funD(const int n_bootstrap,
                     const int n_obs_censusdata, 
                     const Eigen::Map<Eigen::VectorXd> locationeffects, 
                     const Eigen::Map<Eigen::VectorXd> residuals,
                     const Eigen::Map<Eigen::MatrixXd> X, 
                     const Eigen::Map<Eigen::MatrixXd> beta_sample,
                     int ncores) {

  // --------- create random sample of locations and of residuals --------- //

  // initialise random seeds 
  std::random_device rd; // used to obtain a seed for the number engine
  dqrng::xoshiro256plus gen(rd());

  // initialize distributions for randam locations and residuals
  const int upperlocation = locationeffects.size();
  const int upperresiduals = residuals.size();

   // subtract 1 because in C++ indices start with 0
  std::uniform_int_distribution<> distrloc(0, upperlocation - 1);
  std::uniform_int_distribution<> distrres(0, upperresiduals - 1);

  // initialize and fill matrix for randam locations and residuals 
  Eigen::MatrixXd LocationEffectResiduals(n_obs_censusdata, n_bootstrap);

  #pragma omp parallel num_threads(ncores)
  {
    dqrng::xoshiro256plus lgen(gen);      // make thread local copy of rng 
    lgen.jump(omp_get_thread_num() + 1);  // advance rng by 1 ... ncores jumps 

    #pragma omp for
    for (int i=0; i<n_obs_censusdata; ++i)
      for (int j=0; j<n_bootstrap; j++)
        LocationEffectResiduals(i,j) = locationeffects[distrloc(lgen)] + residuals[distrres(lgen)];
  }  

  // ----- create Xbeta ------- //
  Eigen::MatrixXd Xbeta = X * beta_sample;

  // ----- combine results ------- //
  Eigen::MatrixXd returnmatrix = Xbeta + LocationEffectResiduals;

  return returnmatrix;
}

在我的双核 Linux 系统上,我的 funDncores = 1 比你的 funC 稍快,可能是因为使用的 RNG 更快。使用 ncores = 2 它又获得了 30-40%。不错,因为并非所有代码都是并行执行的。我不知道现在 Windows 上的 OpenMP 性能有多好。改用 RcppParallel 可能更有意义。但这需要对您的代码进行更多更改。

以上代码应来源于 Rcpp::sourceCpp()。当你把它放入一个包中时,你应该使用

CXX_STD = CXX11
PKG_CXXFLAGS = $(SHLIB_OPENMP_CXXFLAGS)
PKG_LIBS = $(SHLIB_OPENMP_CXXFLAGS)

Makevars(.win) 中。请注意,根据 WRE,如果 C++11 与 C++98 使用不同的编译器,这可能不会像预期的那样工作。 IIRC Solaris 是唯一采用默认配置的平台。所以对于内部包你应该没问题。