优化比 python 版本慢 运行 的 Fortran 代码

Optimize fortran code that is running slower than the its python version

我有一个 Fortran 代码,我正在使用 f2py 将其编译为 python 中的 运行。我编写这段代码是为了更快地处理已经存在的 python 代码,但它实际上 运行ning 比它的 python 对应代码慢,这让我认为它没有优化. (这可能与 有关,尽管在那种情况下的罪魁祸首是我在这里没有使用的东西,所以它不适用于我。)

当输入使用维度 3 和维度 4(就好像它们是 x 和 y)执行二维相关时,代码获得一个 4D 矩阵。

密码是这个:

SUBROUTINE correlate4D(Vars, nxc, nyc, nv, nt, vCorr)
! Correlates 4D array assuming that x, y are dims 3 and 4
! Returns a 4D array with shape nv, nt, 2*nxc+1, 2*nyc+1
IMPLICIT NONE
real(kind=8), intent(in), dimension(:,:,:,:) :: Vars
real(kind=8) :: dummysize
integer, intent(in) :: nxc, nyc
integer :: ii, i, iix, iiy, iv, it, dims(4), nv, nt, nx, ny
integer, dimension(2*nxc+1) :: xdel
integer, dimension(2*nyc+1) :: ydel
real(kind=8), intent(out) :: vCorr(nv, nt, 2*nxc+1, 2*nyc+1)
real(kind=8), dimension(:,:,:,:), allocatable :: rolled, prerolled
real(kind=8), dimension(:,:), allocatable :: Mean
real(kind=8), dimension(:,:,:), allocatable :: Mean3d

dims = shape(Vars)
nx=dims(3)
ny=dims(4)
dummysize=nx*ny
allocate(rolled(nv, nt, nx, ny))
allocate(prerolled(nv, nt, nx, ny))
allocate(Mean3d(nv, nt, nx))
allocate(Mean(nv, nt))

Mean3d = sum(Vars, dim=4)/size(Vars, dim=4)
Mean = sum(Mean3d, dim=3)/size(Mean3d, dim=3)

! These replace np.arange()
ii=1
do i=-nxc,+nxc
    xdel(ii)=i
    ii=ii+1
enddo
ii=1
do i=-nyc,+nyc
    ydel(ii)=i
    ii=ii+1
enddo

! Calculate the correlation
do iiy=1,size(ydel)
    print*,'fortran loop:',iiy,' of', size(ydel)
    ! cshift replaces np.roll()
    prerolled = cshift(Vars, ydel(iiy), dim=4)
    do iix=1,size(xdel)
        rolled = cshift(prerolled, xdel(iix), dim=3)
        forall (it=1:nt)
            forall (iv=1:nv)
                vCorr(iv,it,iix,iiy) = (sum(Vars(iv,it,:,:) * rolled(iv,it,:,:))/dummysize) / (Mean(iv,it)**2)
            endforall
        endforall
    enddo
enddo

END SUBROUTINE

运行 大小为 (3, 50, 100, 100) 的矩阵用 f2py 编译的代码需要 251 秒,而纯 python/numpy 代码只需要 103 秒。顺便说一下,这有点像输入的矩阵的平均大小应该是 (3, 300, 100, 100),但不会比那个大很多。

任何人都可以向我指出可以优化此代码的方法吗?

编辑

我用f2py3.4 -c mwe.f90 -m mwe编译,然后可以用

调用
In [1]: import mwe
In [2]: import numpy as np
In [3]: a=np.random.randn(3,15,100,100)
In [4]: mwe.correlate4d(a, 50, 50, 3, 15)

EDIT2

阅读评论后,我能够通过更改索引的顺序来改进它。现在它比 Python 快了大约 10%,但还是太慢了。我相信这可以更快地完成。

SUBROUTINE correlate4D2(Vars, nxc, nyc, nt, nv, vCorr)
! Correlates 4D array assuming that x, y are dims 1 and 2
! Returns a 4D array with shape 2*nxc+1, 2*nyc+1, nt, nv
IMPLICIT NONE
INTEGER, PARAMETER  ::  dp = SELECTED_REAL_KIND (13)
real(kind=8), intent(in), dimension(:,:,:,:) :: Vars
real(kind=8) :: dummysize
integer, intent(in) :: nxc, nyc
integer :: ii, i, iix, iiy, iv, it, dims(4), nv, nt, nx, ny
integer, dimension(2*nxc+1) :: xdel
integer, dimension(2*nyc+1) :: ydel
!real(kind=8), intent(out) :: vCorr(nv, nt, 2*nxc+1, 2*nyc+1)
real(kind=8), intent(out) :: vCorr(2*nxc+1, 2*nyc+1, nt, nv)
real(kind=8), dimension(:,:,:,:), allocatable :: rolled, prerolled
real(kind=8), dimension(:,:), allocatable :: Mean
real(kind=8), dimension(:,:,:), allocatable :: Mean3d

dims = shape(Vars)
nx=dims(1)
ny=dims(1)
dummysize=nx*ny
allocate(rolled(nx, ny, nt, nv))
allocate(prerolled(nx, ny, nt, nv))
allocate(Mean3d(ny, nt, nv))
allocate(Mean(nt, nv))

Mean3d = sum(Vars, dim=1)/size(Vars, dim=1)
Mean = sum(Mean3d, dim=1)/size(Mean3d, dim=1)

ii=1
do i=-nxc,+nxc
    xdel(ii)=i
    ii=ii+1
enddo
ii=1
do i=-nyc,+nyc
    ydel(ii)=i
    ii=ii+1
enddo

do iiy=1,size(ydel)
    print*,'fortran loop:',iiy,' of', size(ydel)
    prerolled = cshift(Vars, ydel(iiy), dim=2)
    do iix=1,size(xdel)
        rolled = cshift(prerolled, xdel(iix), dim=1)
        forall (iv=1:nv)
            forall (it=1:nt)
                vCorr(iix,iiy,it,iv) = (sum(Vars(:,:,it,iv) * rolled(:,:,it,iv))/dummysize) / (Mean(it,iv)**2)
            endforall
        endforall
    enddo
enddo

END SUBROUTINE

此外,即使现在代码中有一个 dp 参数(应该是 returns 8),如果我用 real(dp) 声明变量,f2py 会抛出这个错误: Parameter 'dp' at (1) has not been declared or is a variable,即使它被声明了。所以这就是我直接使用 8 的原因。

注意:接下来是一个相当冗长乏味的答案...

因为对大矩阵重复使用cshift()似乎很昂贵,所以我尝试了一些围绕cshift的修改。为此,我首先创建了 OP 代码的最小版本:

program main
    implicit none
    integer, parameter :: N = 100, nt = 50, dp = kind(0.0d0)
    real(dp), allocatable, dimension(:,:,:) :: A, Ashift_y, Ashift, B
    integer :: sx, sy, i, t

    allocate( A( N, N, nt ), Ashift_y( N, N, nt ), Ashift( N, N, nt ), &
              B( -N:N-1, -N:N-1, nt ) )
    call initA

    do sy = -N, N-1
        if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy

        Ashift_y = cshift( A, sy, dim=2 )

        do sx = -N, N-1
            Ashift = cshift( Ashift_y, sx, dim=1 )

            do t = 1, nt
                B( sx, sy, t )= sum( A( :, :, t ) * Ashift( :, :, t ) )
            enddo
        enddo
    enddo

    print *, "sum(B) = ", sum(B)
    print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )

contains
    subroutine initA
        integer ix, iy
        forall( t = 1:nt, iy = 1:N, ix = 1:N )  &   ! (forall not recommended but for brevity)
                A( ix, iy, t ) = 1.0_dp / ( mod(ix + iy + t, 100) + 1 )
    endsubroutine    
endprogram

这给出了

sum(B) =    53817771.021093562     
sum( B( 0:N-1, 0:N-1, : ) ) =    13454442.755258575 

Mac mini (2.3GHz,4-core), gfortran-6.3 -O3 : 50 sec
Linux (2.6GHz,16-core),   gfortran-4.8 -O3 : 32 sec

接下来,因为cshift(A,s,dim=1 (or 2))相对于移位s是周期性的(具有周期性N),sxsy的循环可以分成四部分,只保留第一象限(即 [0,N-1] 中的 sxsy)。其他象限的数据是通过简单的复制第一象限的数据得到的。这使 CPU 时间减少了 4。(更简单地说,我们可以仅在 [-N/2,N/2] 中计算 sxsy 因为B其他地区未提供新信息。)

    do sy = 0, N-1
        if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
        Ashift_y = cshift( A, sy, dim=2 )

        do sx = 0, N-1
            Ashift = cshift( Ashift_y, sx, dim=1 )

            do t = 1, nt
                B( sx, sy, t )= sum( A( :, :, t ) * Ashift( :, :, t ) )
            enddo
        enddo
    enddo

    print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )

    !! Make "full" B.
    B( -N :  -1,  0 : N-1, : ) = B( 0 : N-1, 0 : N-1, : )
    B(  0 : N-1, -N :  -1, : ) = B( 0 : N-1, 0 : N-1, : )
    B( -N :  -1, -N :  -1, : ) = B( 0 : N-1, 0 : N-1, : )
    print *, "sum(B) = ", sum(B)

结果与完整计算一致,符合预期:

sum(B) =    53817771.021093562     
sum( B( 0:N-1, 0:N-1, : ) ) =    13454442.755258575     

Mac   : 12.8 sec
Linux :  8.3 sec

对应的Python代码可能是这样的:

from __future__ import print_function, division
import numpy as np

N, nt = 100, 50

A = np.zeros( (nt, N, N) )
B = np.zeros( (nt, N, N) )

for t in range(nt):
    for iy in range(N):
        for ix in range(N):
            A[ t, iy, ix ] = 1.0 / ( (ix + iy + t) % 100 + 1 )

for sy in range( N ):
    if sy % (N // 10) == 0 : print( "sy = ", sy )
    Ashift_y = np.roll( A, -sy, axis=1 )

    for sx in range( N ):
        Ashift = np.roll( Ashift_y, -sx, axis=2 )

        for t in range( nt ):
            B[ t, sy, sx ] = np.sum( A[ t, :, : ] * Ashift[ t, :, : ] )

print( "sum( B( :, 0:N-1, 0:N-1 ) ) = ",  np.sum( B ) )

在 Mac 和 Linux (python3.5).

上都运行了 22--24 秒

为了进一步降低成本,我们利用了 cshift 可以以两种等效方式使用的事实:

cshift( array, s ) == array( cshift( [1,2,...,n], s ) )   !! assuming that "array" is declared as a( n )

然后我们可以重写上面的代码使得cshift()只接收ind = [1,2,...,N]:

    integer, dimension(N) :: ind, indx, indy
    ind = [( i, i=1,N )]

    do sy = 0, N-1
        if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
        indy = cshift( ind, sy )

        do sx = 0, N-1
            indx = cshift( ind, sx )

            do t = 1, nt
                B( sx, sy, t )= sum( A( :, :, t ) * A( indx, indy, t ) )
            enddo
        enddo
    enddo

在 Mac 和 Linux 上都在大约 5 秒内运行。类似的方法可能也适用于 Python。 (我也尝试过使用 mod() 显式索引来完全消除 cshift ,但有点令人惊讶的是,它比上面的代码慢了两倍多......)

即使进行了这种减少,对于大 nt(比如问题中显示的 300),代码也会变慢。在这种情况下,我们可能会求助于最终武器,使 sy 上的循环并行化:

program main
    implicit none
    integer, parameter :: N = 100, nt = 50, dp = kind(0.0d0)
!    integer, parameter :: N = 100, nt = 300, dp = kind(0.0d0)
    real(dp), allocatable, dimension(:,:,:) :: A, B
    integer, dimension(N) :: ind, indx, indy
    integer :: sx, sy, i, t

    allocate( A( N, N, nt ), B( -N:N-1, -N:N-1, nt ) )
    call initA
    ind = [( i, i=1,N )]

    !$omp parallel do private(sx,sy,t,indx,indy)
    do sy = 0, N-1
        if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
        indy = cshift( ind, sy )

        do sx = 0, N-1
            indx = cshift( ind, sx )

            do t = 1, nt
                B( sx, sy, t )= sum( A( :, :, t ) * A( indx, indy, t ) )
            enddo
        enddo
    enddo
    !$omp end parallel do

    print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )

    ! "contains subroutine initA ..." here

endprogram

时序数据是这样的(用gfortran -O3 -fopenmp):

N = 100, nt = 50
sum( B( 0:N-1, 0:N-1, : ) ) =    13454442.755258575     
Mac:
   serial : 5.3 sec
2 threads : 2.6 sec
4 threads : 1.4 sec

N = 100, nt = 50
sum( B( 0:N-1, 0:N-1, : ) ) =    13454442.755258575     
Linux:
    serial : 4.8 sec
 2 threads : 2.7 sec
 4 threads : 1.3 sec
 8 threads : 0.7 sec
16 threads : 0.4 sec
32 threads : 0.4 sec

N = 100, nt = 300   // heavy case
sum( B( 0:N-1, 0:N-1, : ) ) =    80726656.531429410     
Linux:
 2 threads: 16.5 sec
 4 threads:  8.4 sec
 8 threads:  4.4 sec
16 threads:  2.5 sec

所以,如果上面的代码没有错误(希望如此!),我们可以通过 (1) 将 sxsy 限制为 [0] 来节省很多 CPU 时间,N-1](或更简单的 [-N/2,N/2],无需进一步复制),(2) 将 cshift 应用于索引数组(而不是数据数组),and/or (3) sy 上的并行化(希望可以与 f2py 结合...)