Rcpp:我的距离矩阵程序比包中的函数慢
Rcpp: my distance matrix program is slower than the function in package
我想计算成对的欧式距离矩阵。我在 Dirk Eddelbuettel 的建议下编写了 Rcpp 程序如下
NumericMatrix calcPWD1 (NumericMatrix x){
int outrows = x.nrow();
double d;
NumericMatrix out(outrows,outrows);
for (int i = 0 ; i < outrows - 1; i++){
for (int j = i + 1 ; j < outrows ; j ++){
NumericVector v1= x.row(i);
NumericVector v2= x.row(j);
NumericVector v3=v1-v2;
d = sqrt(sum(pow(v3,2)));
out(j,i)=d;
out(i,j)=d;
}
}
return (out) ;
}
但我发现我的程序比 dist
函数慢。
> benchmark(as.matrix(dist(b)),calcPWD1(b))
test replications elapsed relative user.self sys.self user.child sys.child
1 as.matrix(dist(b)) 100 24.831 1.000 24.679 0.010 0 0
2 calcPWD1(b) 100 27.362 1.102 27.346 0.007 0 0
大家有什么建议吗?我的矩阵非常简单。没有列名或行名,只有普通矩阵(例如 b=matrix(c(rnorm(1000*10)),1000,10)
)。
这是dist
的程序
> dist
function (x, method = "euclidean", diag = FALSE, upper = FALSE,
p = 2)
{
if (!is.na(pmatch(method, "euclidian")))
method <- "euclidean"
METHODS <- c("euclidean", "maximum", "manhattan", "canberra",
"binary", "minkowski")
method <- pmatch(method, METHODS)
if (is.na(method))
stop("invalid distance method")
if (method == -1)
stop("ambiguous distance method")
x <- as.matrix(x)
N <- nrow(x)
attrs <- if (method == 6L)
list(Size = N, Labels = dimnames(x)[[1L]], Diag = diag,
Upper = upper, method = METHODS[method], p = p, call = match.call(),
class = "dist")
else list(Size = N, Labels = dimnames(x)[[1L]], Diag = diag,
Upper = upper, method = METHODS[method], call = match.call(),
class = "dist")
.Call(C_Cdist, x, method, attrs, p)
}
<bytecode: 0x56b0d40>
<environment: namespace:stats>
我希望我的程序比 dist
快,因为在 dist
中需要检查的东西太多(例如 method
、diag
)。
你几乎在那里。但是您的内部循环体试图在一行中做太多事情。模板编程已经够难了,有时最好把指令分散一点,给编译器一个更好的机会。所以我只做了五个语句,并立即构建。
新代码:
#include <Rcpp.h>
using namespace Rcpp;
double dist1 (NumericVector x, NumericVector y){
int n = y.length();
double total = 0;
for (int i = 0; i < n ; ++i) {
total += pow(x(i)-y(i),2.0);
}
total = sqrt(total);
return total;
}
// [[Rcpp::export]]
NumericMatrix calcPWD (NumericMatrix x){
int outrows = x.nrow();
int outcols = x.nrow();
NumericMatrix out(outrows,outcols);
for (int i = 0 ; i < outrows - 1; i++){
for (int j = i + 1 ; j < outcols ; j ++) {
NumericVector v1 = x.row(i);
NumericVector v2 = x.row(j-1);
double d = dist1(v1, v2);
out(j-1,i) = d;
out(i,j-1)= d;
}
}
return (out) ;
}
/*** R
M <- matrix(log(1:9), 3, 3)
calcPWD(M)
*/
运行它:
R> sourceCpp("/tmp/mikebrown.cpp")
R> M <- matrix(log(1:9), 3, 3)
R> calcPWD(M)
[,1] [,2] [,3]
[1,] 0.000000 0.740322 0
[2,] 0.740322 0.000000 0
[3,] 0.000000 0.000000 0
R>
不过您可能需要检查索引逻辑。看来您错过了更多比较。
编辑:对于踢球,这里有一个更紧凑的距离函数版本:
// [[Rcpp::export]]
double dist2(NumericVector x, NumericVector y){
double d = sqrt( sum( pow(x - y, 2) ) );
return d;
}
Rcpp 与内部 R 函数 (C/Fortran)
首先,仅仅因为您使用 Rcpp 编写算法并不一定意味着它会击败 R 等价物,特别是如果 R 函数调用 C 或 Fortran 例程来执行大部分计算。在函数纯粹用 R 编写的其他情况下,将其转换为 Rcpp 很可能会产生所需的速度增益。
请记住,在重写内部函数时,与绝对疯狂的 C 程序员组成的 R Core 团队较量最有可能获胜。
dist()
的基本实现
其次,R 使用的距离计算是在 C 中完成的,如下所示:
.Call(C_Cdist, x, method, attrs, p)
,这是 dist()
函数的 R 源代码的最后一行。这使它与 C++ 相比略有优势,因为它更细化而不是模板化。
此外,C implementation uses OpenMP 可用于并行计算。
建议修改
第三,通过稍微改变子集的顺序并避免创建额外的变量,版本之间的时间间隔减少了。
#include <Rcpp.h>
// [[Rcpp::export]]
Rcpp::NumericMatrix calcPWD1 (const Rcpp::NumericMatrix & x){
unsigned int outrows = x.nrow(), i = 0, j = 0;
double d;
Rcpp::NumericMatrix out(outrows,outrows);
for (i = 0; i < outrows - 1; i++){
Rcpp::NumericVector v1 = x.row(i);
for (j = i + 1; j < outrows ; j ++){
d = sqrt(sum(pow(v1-x.row(j), 2.0)));
out(j,i)=d;
out(i,j)=d;
}
}
return out;
}
我想计算成对的欧式距离矩阵。我在 Dirk Eddelbuettel 的建议下编写了 Rcpp 程序如下
NumericMatrix calcPWD1 (NumericMatrix x){
int outrows = x.nrow();
double d;
NumericMatrix out(outrows,outrows);
for (int i = 0 ; i < outrows - 1; i++){
for (int j = i + 1 ; j < outrows ; j ++){
NumericVector v1= x.row(i);
NumericVector v2= x.row(j);
NumericVector v3=v1-v2;
d = sqrt(sum(pow(v3,2)));
out(j,i)=d;
out(i,j)=d;
}
}
return (out) ;
}
但我发现我的程序比 dist
函数慢。
> benchmark(as.matrix(dist(b)),calcPWD1(b))
test replications elapsed relative user.self sys.self user.child sys.child
1 as.matrix(dist(b)) 100 24.831 1.000 24.679 0.010 0 0
2 calcPWD1(b) 100 27.362 1.102 27.346 0.007 0 0
大家有什么建议吗?我的矩阵非常简单。没有列名或行名,只有普通矩阵(例如 b=matrix(c(rnorm(1000*10)),1000,10)
)。
这是dist
> dist
function (x, method = "euclidean", diag = FALSE, upper = FALSE,
p = 2)
{
if (!is.na(pmatch(method, "euclidian")))
method <- "euclidean"
METHODS <- c("euclidean", "maximum", "manhattan", "canberra",
"binary", "minkowski")
method <- pmatch(method, METHODS)
if (is.na(method))
stop("invalid distance method")
if (method == -1)
stop("ambiguous distance method")
x <- as.matrix(x)
N <- nrow(x)
attrs <- if (method == 6L)
list(Size = N, Labels = dimnames(x)[[1L]], Diag = diag,
Upper = upper, method = METHODS[method], p = p, call = match.call(),
class = "dist")
else list(Size = N, Labels = dimnames(x)[[1L]], Diag = diag,
Upper = upper, method = METHODS[method], call = match.call(),
class = "dist")
.Call(C_Cdist, x, method, attrs, p)
}
<bytecode: 0x56b0d40>
<environment: namespace:stats>
我希望我的程序比 dist
快,因为在 dist
中需要检查的东西太多(例如 method
、diag
)。
你几乎在那里。但是您的内部循环体试图在一行中做太多事情。模板编程已经够难了,有时最好把指令分散一点,给编译器一个更好的机会。所以我只做了五个语句,并立即构建。
新代码:
#include <Rcpp.h>
using namespace Rcpp;
double dist1 (NumericVector x, NumericVector y){
int n = y.length();
double total = 0;
for (int i = 0; i < n ; ++i) {
total += pow(x(i)-y(i),2.0);
}
total = sqrt(total);
return total;
}
// [[Rcpp::export]]
NumericMatrix calcPWD (NumericMatrix x){
int outrows = x.nrow();
int outcols = x.nrow();
NumericMatrix out(outrows,outcols);
for (int i = 0 ; i < outrows - 1; i++){
for (int j = i + 1 ; j < outcols ; j ++) {
NumericVector v1 = x.row(i);
NumericVector v2 = x.row(j-1);
double d = dist1(v1, v2);
out(j-1,i) = d;
out(i,j-1)= d;
}
}
return (out) ;
}
/*** R
M <- matrix(log(1:9), 3, 3)
calcPWD(M)
*/
运行它:
R> sourceCpp("/tmp/mikebrown.cpp")
R> M <- matrix(log(1:9), 3, 3)
R> calcPWD(M)
[,1] [,2] [,3]
[1,] 0.000000 0.740322 0
[2,] 0.740322 0.000000 0
[3,] 0.000000 0.000000 0
R>
不过您可能需要检查索引逻辑。看来您错过了更多比较。
编辑:对于踢球,这里有一个更紧凑的距离函数版本:
// [[Rcpp::export]]
double dist2(NumericVector x, NumericVector y){
double d = sqrt( sum( pow(x - y, 2) ) );
return d;
}
Rcpp 与内部 R 函数 (C/Fortran)
首先,仅仅因为您使用 Rcpp 编写算法并不一定意味着它会击败 R 等价物,特别是如果 R 函数调用 C 或 Fortran 例程来执行大部分计算。在函数纯粹用 R 编写的其他情况下,将其转换为 Rcpp 很可能会产生所需的速度增益。
请记住,在重写内部函数时,与绝对疯狂的 C 程序员组成的 R Core 团队较量最有可能获胜。
dist()
的基本实现
其次,R 使用的距离计算是在 C 中完成的,如下所示:
.Call(C_Cdist, x, method, attrs, p)
,这是 dist()
函数的 R 源代码的最后一行。这使它与 C++ 相比略有优势,因为它更细化而不是模板化。
此外,C implementation uses OpenMP 可用于并行计算。
建议修改
第三,通过稍微改变子集的顺序并避免创建额外的变量,版本之间的时间间隔减少了。
#include <Rcpp.h>
// [[Rcpp::export]]
Rcpp::NumericMatrix calcPWD1 (const Rcpp::NumericMatrix & x){
unsigned int outrows = x.nrow(), i = 0, j = 0;
double d;
Rcpp::NumericMatrix out(outrows,outrows);
for (i = 0; i < outrows - 1; i++){
Rcpp::NumericVector v1 = x.row(i);
for (j = i + 1; j < outrows ; j ++){
d = sqrt(sum(pow(v1-x.row(j), 2.0)));
out(j,i)=d;
out(i,j)=d;
}
}
return out;
}