优化比 python 版本慢 运行 的 Fortran 代码
Optimize fortran code that is running slower than the its python version
我有一个 Fortran 代码,我正在使用 f2py 将其编译为 python 中的 运行。我编写这段代码是为了更快地处理已经存在的 python 代码,但它实际上 运行ning 比它的 python 对应代码慢,这让我认为它没有优化. (这可能与 有关,尽管在那种情况下的罪魁祸首是我在这里没有使用的东西,所以它不适用于我。)
当输入使用维度 3 和维度 4(就好像它们是 x 和 y)执行二维相关时,代码获得一个 4D 矩阵。
密码是这个:
SUBROUTINE correlate4D(Vars, nxc, nyc, nv, nt, vCorr)
! Correlates 4D array assuming that x, y are dims 3 and 4
! Returns a 4D array with shape nv, nt, 2*nxc+1, 2*nyc+1
IMPLICIT NONE
real(kind=8), intent(in), dimension(:,:,:,:) :: Vars
real(kind=8) :: dummysize
integer, intent(in) :: nxc, nyc
integer :: ii, i, iix, iiy, iv, it, dims(4), nv, nt, nx, ny
integer, dimension(2*nxc+1) :: xdel
integer, dimension(2*nyc+1) :: ydel
real(kind=8), intent(out) :: vCorr(nv, nt, 2*nxc+1, 2*nyc+1)
real(kind=8), dimension(:,:,:,:), allocatable :: rolled, prerolled
real(kind=8), dimension(:,:), allocatable :: Mean
real(kind=8), dimension(:,:,:), allocatable :: Mean3d
dims = shape(Vars)
nx=dims(3)
ny=dims(4)
dummysize=nx*ny
allocate(rolled(nv, nt, nx, ny))
allocate(prerolled(nv, nt, nx, ny))
allocate(Mean3d(nv, nt, nx))
allocate(Mean(nv, nt))
Mean3d = sum(Vars, dim=4)/size(Vars, dim=4)
Mean = sum(Mean3d, dim=3)/size(Mean3d, dim=3)
! These replace np.arange()
ii=1
do i=-nxc,+nxc
xdel(ii)=i
ii=ii+1
enddo
ii=1
do i=-nyc,+nyc
ydel(ii)=i
ii=ii+1
enddo
! Calculate the correlation
do iiy=1,size(ydel)
print*,'fortran loop:',iiy,' of', size(ydel)
! cshift replaces np.roll()
prerolled = cshift(Vars, ydel(iiy), dim=4)
do iix=1,size(xdel)
rolled = cshift(prerolled, xdel(iix), dim=3)
forall (it=1:nt)
forall (iv=1:nv)
vCorr(iv,it,iix,iiy) = (sum(Vars(iv,it,:,:) * rolled(iv,it,:,:))/dummysize) / (Mean(iv,it)**2)
endforall
endforall
enddo
enddo
END SUBROUTINE
运行 大小为 (3, 50, 100, 100)
的矩阵用 f2py
编译的代码需要 251 秒,而纯 python/numpy 代码只需要 103 秒。顺便说一下,这有点像输入的矩阵的平均大小应该是 (3, 300, 100, 100)
,但不会比那个大很多。
任何人都可以向我指出可以优化此代码的方法吗?
编辑
我用f2py3.4 -c mwe.f90 -m mwe
编译,然后可以用
调用
In [1]: import mwe
In [2]: import numpy as np
In [3]: a=np.random.randn(3,15,100,100)
In [4]: mwe.correlate4d(a, 50, 50, 3, 15)
EDIT2
阅读评论后,我能够通过更改索引的顺序来改进它。现在它比 Python 快了大约 10%,但还是太慢了。我相信这可以更快地完成。
SUBROUTINE correlate4D2(Vars, nxc, nyc, nt, nv, vCorr)
! Correlates 4D array assuming that x, y are dims 1 and 2
! Returns a 4D array with shape 2*nxc+1, 2*nyc+1, nt, nv
IMPLICIT NONE
INTEGER, PARAMETER :: dp = SELECTED_REAL_KIND (13)
real(kind=8), intent(in), dimension(:,:,:,:) :: Vars
real(kind=8) :: dummysize
integer, intent(in) :: nxc, nyc
integer :: ii, i, iix, iiy, iv, it, dims(4), nv, nt, nx, ny
integer, dimension(2*nxc+1) :: xdel
integer, dimension(2*nyc+1) :: ydel
!real(kind=8), intent(out) :: vCorr(nv, nt, 2*nxc+1, 2*nyc+1)
real(kind=8), intent(out) :: vCorr(2*nxc+1, 2*nyc+1, nt, nv)
real(kind=8), dimension(:,:,:,:), allocatable :: rolled, prerolled
real(kind=8), dimension(:,:), allocatable :: Mean
real(kind=8), dimension(:,:,:), allocatable :: Mean3d
dims = shape(Vars)
nx=dims(1)
ny=dims(1)
dummysize=nx*ny
allocate(rolled(nx, ny, nt, nv))
allocate(prerolled(nx, ny, nt, nv))
allocate(Mean3d(ny, nt, nv))
allocate(Mean(nt, nv))
Mean3d = sum(Vars, dim=1)/size(Vars, dim=1)
Mean = sum(Mean3d, dim=1)/size(Mean3d, dim=1)
ii=1
do i=-nxc,+nxc
xdel(ii)=i
ii=ii+1
enddo
ii=1
do i=-nyc,+nyc
ydel(ii)=i
ii=ii+1
enddo
do iiy=1,size(ydel)
print*,'fortran loop:',iiy,' of', size(ydel)
prerolled = cshift(Vars, ydel(iiy), dim=2)
do iix=1,size(xdel)
rolled = cshift(prerolled, xdel(iix), dim=1)
forall (iv=1:nv)
forall (it=1:nt)
vCorr(iix,iiy,it,iv) = (sum(Vars(:,:,it,iv) * rolled(:,:,it,iv))/dummysize) / (Mean(it,iv)**2)
endforall
endforall
enddo
enddo
END SUBROUTINE
此外,即使现在代码中有一个 dp
参数(应该是 returns 8),如果我用 real(dp)
声明变量,f2py 会抛出这个错误: Parameter 'dp' at (1) has not been declared or is a variable
,即使它被声明了。所以这就是我直接使用 8
的原因。
注意:接下来是一个相当冗长乏味的答案...
因为对大矩阵重复使用cshift()
似乎很昂贵,所以我尝试了一些围绕cshift
的修改。为此,我首先创建了 OP 代码的最小版本:
program main
implicit none
integer, parameter :: N = 100, nt = 50, dp = kind(0.0d0)
real(dp), allocatable, dimension(:,:,:) :: A, Ashift_y, Ashift, B
integer :: sx, sy, i, t
allocate( A( N, N, nt ), Ashift_y( N, N, nt ), Ashift( N, N, nt ), &
B( -N:N-1, -N:N-1, nt ) )
call initA
do sy = -N, N-1
if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
Ashift_y = cshift( A, sy, dim=2 )
do sx = -N, N-1
Ashift = cshift( Ashift_y, sx, dim=1 )
do t = 1, nt
B( sx, sy, t )= sum( A( :, :, t ) * Ashift( :, :, t ) )
enddo
enddo
enddo
print *, "sum(B) = ", sum(B)
print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )
contains
subroutine initA
integer ix, iy
forall( t = 1:nt, iy = 1:N, ix = 1:N ) & ! (forall not recommended but for brevity)
A( ix, iy, t ) = 1.0_dp / ( mod(ix + iy + t, 100) + 1 )
endsubroutine
endprogram
这给出了
sum(B) = 53817771.021093562
sum( B( 0:N-1, 0:N-1, : ) ) = 13454442.755258575
Mac mini (2.3GHz,4-core), gfortran-6.3 -O3 : 50 sec
Linux (2.6GHz,16-core), gfortran-4.8 -O3 : 32 sec
接下来,因为cshift(A,s,dim=1 (or 2))
相对于移位s
是周期性的(具有周期性N
),sx
和sy
的循环可以分成四部分,只保留第一象限(即 [0,N-1] 中的 sx
和 sy
)。其他象限的数据是通过简单的复制第一象限的数据得到的。这使 CPU 时间减少了 4。(更简单地说,我们可以仅在 [-N/2,N/2] 中计算 sx
和 sy
因为B
其他地区未提供新信息。)
do sy = 0, N-1
if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
Ashift_y = cshift( A, sy, dim=2 )
do sx = 0, N-1
Ashift = cshift( Ashift_y, sx, dim=1 )
do t = 1, nt
B( sx, sy, t )= sum( A( :, :, t ) * Ashift( :, :, t ) )
enddo
enddo
enddo
print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )
!! Make "full" B.
B( -N : -1, 0 : N-1, : ) = B( 0 : N-1, 0 : N-1, : )
B( 0 : N-1, -N : -1, : ) = B( 0 : N-1, 0 : N-1, : )
B( -N : -1, -N : -1, : ) = B( 0 : N-1, 0 : N-1, : )
print *, "sum(B) = ", sum(B)
结果与完整计算一致,符合预期:
sum(B) = 53817771.021093562
sum( B( 0:N-1, 0:N-1, : ) ) = 13454442.755258575
Mac : 12.8 sec
Linux : 8.3 sec
对应的Python代码可能是这样的:
from __future__ import print_function, division
import numpy as np
N, nt = 100, 50
A = np.zeros( (nt, N, N) )
B = np.zeros( (nt, N, N) )
for t in range(nt):
for iy in range(N):
for ix in range(N):
A[ t, iy, ix ] = 1.0 / ( (ix + iy + t) % 100 + 1 )
for sy in range( N ):
if sy % (N // 10) == 0 : print( "sy = ", sy )
Ashift_y = np.roll( A, -sy, axis=1 )
for sx in range( N ):
Ashift = np.roll( Ashift_y, -sx, axis=2 )
for t in range( nt ):
B[ t, sy, sx ] = np.sum( A[ t, :, : ] * Ashift[ t, :, : ] )
print( "sum( B( :, 0:N-1, 0:N-1 ) ) = ", np.sum( B ) )
在 Mac 和 Linux (python3.5).
上都运行了 22--24 秒
为了进一步降低成本,我们利用了 cshift
可以以两种等效方式使用的事实:
cshift( array, s ) == array( cshift( [1,2,...,n], s ) ) !! assuming that "array" is declared as a( n )
然后我们可以重写上面的代码使得cshift()
只接收ind = [1,2,...,N]
:
integer, dimension(N) :: ind, indx, indy
ind = [( i, i=1,N )]
do sy = 0, N-1
if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
indy = cshift( ind, sy )
do sx = 0, N-1
indx = cshift( ind, sx )
do t = 1, nt
B( sx, sy, t )= sum( A( :, :, t ) * A( indx, indy, t ) )
enddo
enddo
enddo
在 Mac 和 Linux 上都在大约 5 秒内运行。类似的方法可能也适用于 Python。 (我也尝试过使用 mod()
显式索引来完全消除 cshift
,但有点令人惊讶的是,它比上面的代码慢了两倍多......)
即使进行了这种减少,对于大 nt
(比如问题中显示的 300),代码也会变慢。在这种情况下,我们可能会求助于最终武器,使 sy
上的循环并行化:
program main
implicit none
integer, parameter :: N = 100, nt = 50, dp = kind(0.0d0)
! integer, parameter :: N = 100, nt = 300, dp = kind(0.0d0)
real(dp), allocatable, dimension(:,:,:) :: A, B
integer, dimension(N) :: ind, indx, indy
integer :: sx, sy, i, t
allocate( A( N, N, nt ), B( -N:N-1, -N:N-1, nt ) )
call initA
ind = [( i, i=1,N )]
!$omp parallel do private(sx,sy,t,indx,indy)
do sy = 0, N-1
if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
indy = cshift( ind, sy )
do sx = 0, N-1
indx = cshift( ind, sx )
do t = 1, nt
B( sx, sy, t )= sum( A( :, :, t ) * A( indx, indy, t ) )
enddo
enddo
enddo
!$omp end parallel do
print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )
! "contains subroutine initA ..." here
endprogram
时序数据是这样的(用gfortran -O3 -fopenmp):
N = 100, nt = 50
sum( B( 0:N-1, 0:N-1, : ) ) = 13454442.755258575
Mac:
serial : 5.3 sec
2 threads : 2.6 sec
4 threads : 1.4 sec
N = 100, nt = 50
sum( B( 0:N-1, 0:N-1, : ) ) = 13454442.755258575
Linux:
serial : 4.8 sec
2 threads : 2.7 sec
4 threads : 1.3 sec
8 threads : 0.7 sec
16 threads : 0.4 sec
32 threads : 0.4 sec
N = 100, nt = 300 // heavy case
sum( B( 0:N-1, 0:N-1, : ) ) = 80726656.531429410
Linux:
2 threads: 16.5 sec
4 threads: 8.4 sec
8 threads: 4.4 sec
16 threads: 2.5 sec
所以,如果上面的代码没有错误(希望如此!),我们可以通过 (1) 将 sx
和 sy
限制为 [0] 来节省很多 CPU 时间,N-1](或更简单的 [-N/2,N/2],无需进一步复制),(2) 将 cshift
应用于索引数组(而不是数据数组),and/or (3) sy
上的并行化(希望可以与 f2py 结合...)
我有一个 Fortran 代码,我正在使用 f2py 将其编译为 python 中的 运行。我编写这段代码是为了更快地处理已经存在的 python 代码,但它实际上 运行ning 比它的 python 对应代码慢,这让我认为它没有优化. (这可能与
当输入使用维度 3 和维度 4(就好像它们是 x 和 y)执行二维相关时,代码获得一个 4D 矩阵。
密码是这个:
SUBROUTINE correlate4D(Vars, nxc, nyc, nv, nt, vCorr)
! Correlates 4D array assuming that x, y are dims 3 and 4
! Returns a 4D array with shape nv, nt, 2*nxc+1, 2*nyc+1
IMPLICIT NONE
real(kind=8), intent(in), dimension(:,:,:,:) :: Vars
real(kind=8) :: dummysize
integer, intent(in) :: nxc, nyc
integer :: ii, i, iix, iiy, iv, it, dims(4), nv, nt, nx, ny
integer, dimension(2*nxc+1) :: xdel
integer, dimension(2*nyc+1) :: ydel
real(kind=8), intent(out) :: vCorr(nv, nt, 2*nxc+1, 2*nyc+1)
real(kind=8), dimension(:,:,:,:), allocatable :: rolled, prerolled
real(kind=8), dimension(:,:), allocatable :: Mean
real(kind=8), dimension(:,:,:), allocatable :: Mean3d
dims = shape(Vars)
nx=dims(3)
ny=dims(4)
dummysize=nx*ny
allocate(rolled(nv, nt, nx, ny))
allocate(prerolled(nv, nt, nx, ny))
allocate(Mean3d(nv, nt, nx))
allocate(Mean(nv, nt))
Mean3d = sum(Vars, dim=4)/size(Vars, dim=4)
Mean = sum(Mean3d, dim=3)/size(Mean3d, dim=3)
! These replace np.arange()
ii=1
do i=-nxc,+nxc
xdel(ii)=i
ii=ii+1
enddo
ii=1
do i=-nyc,+nyc
ydel(ii)=i
ii=ii+1
enddo
! Calculate the correlation
do iiy=1,size(ydel)
print*,'fortran loop:',iiy,' of', size(ydel)
! cshift replaces np.roll()
prerolled = cshift(Vars, ydel(iiy), dim=4)
do iix=1,size(xdel)
rolled = cshift(prerolled, xdel(iix), dim=3)
forall (it=1:nt)
forall (iv=1:nv)
vCorr(iv,it,iix,iiy) = (sum(Vars(iv,it,:,:) * rolled(iv,it,:,:))/dummysize) / (Mean(iv,it)**2)
endforall
endforall
enddo
enddo
END SUBROUTINE
运行 大小为 (3, 50, 100, 100)
的矩阵用 f2py
编译的代码需要 251 秒,而纯 python/numpy 代码只需要 103 秒。顺便说一下,这有点像输入的矩阵的平均大小应该是 (3, 300, 100, 100)
,但不会比那个大很多。
任何人都可以向我指出可以优化此代码的方法吗?
编辑
我用f2py3.4 -c mwe.f90 -m mwe
编译,然后可以用
In [1]: import mwe
In [2]: import numpy as np
In [3]: a=np.random.randn(3,15,100,100)
In [4]: mwe.correlate4d(a, 50, 50, 3, 15)
EDIT2
阅读评论后,我能够通过更改索引的顺序来改进它。现在它比 Python 快了大约 10%,但还是太慢了。我相信这可以更快地完成。
SUBROUTINE correlate4D2(Vars, nxc, nyc, nt, nv, vCorr)
! Correlates 4D array assuming that x, y are dims 1 and 2
! Returns a 4D array with shape 2*nxc+1, 2*nyc+1, nt, nv
IMPLICIT NONE
INTEGER, PARAMETER :: dp = SELECTED_REAL_KIND (13)
real(kind=8), intent(in), dimension(:,:,:,:) :: Vars
real(kind=8) :: dummysize
integer, intent(in) :: nxc, nyc
integer :: ii, i, iix, iiy, iv, it, dims(4), nv, nt, nx, ny
integer, dimension(2*nxc+1) :: xdel
integer, dimension(2*nyc+1) :: ydel
!real(kind=8), intent(out) :: vCorr(nv, nt, 2*nxc+1, 2*nyc+1)
real(kind=8), intent(out) :: vCorr(2*nxc+1, 2*nyc+1, nt, nv)
real(kind=8), dimension(:,:,:,:), allocatable :: rolled, prerolled
real(kind=8), dimension(:,:), allocatable :: Mean
real(kind=8), dimension(:,:,:), allocatable :: Mean3d
dims = shape(Vars)
nx=dims(1)
ny=dims(1)
dummysize=nx*ny
allocate(rolled(nx, ny, nt, nv))
allocate(prerolled(nx, ny, nt, nv))
allocate(Mean3d(ny, nt, nv))
allocate(Mean(nt, nv))
Mean3d = sum(Vars, dim=1)/size(Vars, dim=1)
Mean = sum(Mean3d, dim=1)/size(Mean3d, dim=1)
ii=1
do i=-nxc,+nxc
xdel(ii)=i
ii=ii+1
enddo
ii=1
do i=-nyc,+nyc
ydel(ii)=i
ii=ii+1
enddo
do iiy=1,size(ydel)
print*,'fortran loop:',iiy,' of', size(ydel)
prerolled = cshift(Vars, ydel(iiy), dim=2)
do iix=1,size(xdel)
rolled = cshift(prerolled, xdel(iix), dim=1)
forall (iv=1:nv)
forall (it=1:nt)
vCorr(iix,iiy,it,iv) = (sum(Vars(:,:,it,iv) * rolled(:,:,it,iv))/dummysize) / (Mean(it,iv)**2)
endforall
endforall
enddo
enddo
END SUBROUTINE
此外,即使现在代码中有一个 dp
参数(应该是 returns 8),如果我用 real(dp)
声明变量,f2py 会抛出这个错误: Parameter 'dp' at (1) has not been declared or is a variable
,即使它被声明了。所以这就是我直接使用 8
的原因。
注意:接下来是一个相当冗长乏味的答案...
因为对大矩阵重复使用cshift()
似乎很昂贵,所以我尝试了一些围绕cshift
的修改。为此,我首先创建了 OP 代码的最小版本:
program main
implicit none
integer, parameter :: N = 100, nt = 50, dp = kind(0.0d0)
real(dp), allocatable, dimension(:,:,:) :: A, Ashift_y, Ashift, B
integer :: sx, sy, i, t
allocate( A( N, N, nt ), Ashift_y( N, N, nt ), Ashift( N, N, nt ), &
B( -N:N-1, -N:N-1, nt ) )
call initA
do sy = -N, N-1
if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
Ashift_y = cshift( A, sy, dim=2 )
do sx = -N, N-1
Ashift = cshift( Ashift_y, sx, dim=1 )
do t = 1, nt
B( sx, sy, t )= sum( A( :, :, t ) * Ashift( :, :, t ) )
enddo
enddo
enddo
print *, "sum(B) = ", sum(B)
print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )
contains
subroutine initA
integer ix, iy
forall( t = 1:nt, iy = 1:N, ix = 1:N ) & ! (forall not recommended but for brevity)
A( ix, iy, t ) = 1.0_dp / ( mod(ix + iy + t, 100) + 1 )
endsubroutine
endprogram
这给出了
sum(B) = 53817771.021093562
sum( B( 0:N-1, 0:N-1, : ) ) = 13454442.755258575
Mac mini (2.3GHz,4-core), gfortran-6.3 -O3 : 50 sec
Linux (2.6GHz,16-core), gfortran-4.8 -O3 : 32 sec
接下来,因为cshift(A,s,dim=1 (or 2))
相对于移位s
是周期性的(具有周期性N
),sx
和sy
的循环可以分成四部分,只保留第一象限(即 [0,N-1] 中的 sx
和 sy
)。其他象限的数据是通过简单的复制第一象限的数据得到的。这使 CPU 时间减少了 4。(更简单地说,我们可以仅在 [-N/2,N/2] 中计算 sx
和 sy
因为B
其他地区未提供新信息。)
do sy = 0, N-1
if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
Ashift_y = cshift( A, sy, dim=2 )
do sx = 0, N-1
Ashift = cshift( Ashift_y, sx, dim=1 )
do t = 1, nt
B( sx, sy, t )= sum( A( :, :, t ) * Ashift( :, :, t ) )
enddo
enddo
enddo
print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )
!! Make "full" B.
B( -N : -1, 0 : N-1, : ) = B( 0 : N-1, 0 : N-1, : )
B( 0 : N-1, -N : -1, : ) = B( 0 : N-1, 0 : N-1, : )
B( -N : -1, -N : -1, : ) = B( 0 : N-1, 0 : N-1, : )
print *, "sum(B) = ", sum(B)
结果与完整计算一致,符合预期:
sum(B) = 53817771.021093562
sum( B( 0:N-1, 0:N-1, : ) ) = 13454442.755258575
Mac : 12.8 sec
Linux : 8.3 sec
对应的Python代码可能是这样的:
from __future__ import print_function, division
import numpy as np
N, nt = 100, 50
A = np.zeros( (nt, N, N) )
B = np.zeros( (nt, N, N) )
for t in range(nt):
for iy in range(N):
for ix in range(N):
A[ t, iy, ix ] = 1.0 / ( (ix + iy + t) % 100 + 1 )
for sy in range( N ):
if sy % (N // 10) == 0 : print( "sy = ", sy )
Ashift_y = np.roll( A, -sy, axis=1 )
for sx in range( N ):
Ashift = np.roll( Ashift_y, -sx, axis=2 )
for t in range( nt ):
B[ t, sy, sx ] = np.sum( A[ t, :, : ] * Ashift[ t, :, : ] )
print( "sum( B( :, 0:N-1, 0:N-1 ) ) = ", np.sum( B ) )
在 Mac 和 Linux (python3.5).
上都运行了 22--24 秒为了进一步降低成本,我们利用了 cshift
可以以两种等效方式使用的事实:
cshift( array, s ) == array( cshift( [1,2,...,n], s ) ) !! assuming that "array" is declared as a( n )
然后我们可以重写上面的代码使得cshift()
只接收ind = [1,2,...,N]
:
integer, dimension(N) :: ind, indx, indy
ind = [( i, i=1,N )]
do sy = 0, N-1
if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
indy = cshift( ind, sy )
do sx = 0, N-1
indx = cshift( ind, sx )
do t = 1, nt
B( sx, sy, t )= sum( A( :, :, t ) * A( indx, indy, t ) )
enddo
enddo
enddo
在 Mac 和 Linux 上都在大约 5 秒内运行。类似的方法可能也适用于 Python。 (我也尝试过使用 mod()
显式索引来完全消除 cshift
,但有点令人惊讶的是,它比上面的代码慢了两倍多......)
即使进行了这种减少,对于大 nt
(比如问题中显示的 300),代码也会变慢。在这种情况下,我们可能会求助于最终武器,使 sy
上的循环并行化:
program main
implicit none
integer, parameter :: N = 100, nt = 50, dp = kind(0.0d0)
! integer, parameter :: N = 100, nt = 300, dp = kind(0.0d0)
real(dp), allocatable, dimension(:,:,:) :: A, B
integer, dimension(N) :: ind, indx, indy
integer :: sx, sy, i, t
allocate( A( N, N, nt ), B( -N:N-1, -N:N-1, nt ) )
call initA
ind = [( i, i=1,N )]
!$omp parallel do private(sx,sy,t,indx,indy)
do sy = 0, N-1
if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
indy = cshift( ind, sy )
do sx = 0, N-1
indx = cshift( ind, sx )
do t = 1, nt
B( sx, sy, t )= sum( A( :, :, t ) * A( indx, indy, t ) )
enddo
enddo
enddo
!$omp end parallel do
print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )
! "contains subroutine initA ..." here
endprogram
时序数据是这样的(用gfortran -O3 -fopenmp):
N = 100, nt = 50
sum( B( 0:N-1, 0:N-1, : ) ) = 13454442.755258575
Mac:
serial : 5.3 sec
2 threads : 2.6 sec
4 threads : 1.4 sec
N = 100, nt = 50
sum( B( 0:N-1, 0:N-1, : ) ) = 13454442.755258575
Linux:
serial : 4.8 sec
2 threads : 2.7 sec
4 threads : 1.3 sec
8 threads : 0.7 sec
16 threads : 0.4 sec
32 threads : 0.4 sec
N = 100, nt = 300 // heavy case
sum( B( 0:N-1, 0:N-1, : ) ) = 80726656.531429410
Linux:
2 threads: 16.5 sec
4 threads: 8.4 sec
8 threads: 4.4 sec
16 threads: 2.5 sec
所以,如果上面的代码没有错误(希望如此!),我们可以通过 (1) 将 sx
和 sy
限制为 [0] 来节省很多 CPU 时间,N-1](或更简单的 [-N/2,N/2],无需进一步复制),(2) 将 cshift
应用于索引数组(而不是数据数组),and/or (3) sy
上的并行化(希望可以与 f2py 结合...)