使用 g++ 对带位操作的循环进行自动矢量化

Automatic vectorization with g++ of a loop with bit operations

是否可以矢量化此循环(使用 g++)?

char x;
int k;
for(int s = 0; s < 4; s++) {
  A[k++] += B[x&3];
  x >>= 2;
}

AB 是指向非重叠浮点数组的指针; B 的索引为 0 到 3。我需要最大限度地提高可移植性,因为这是针对 R 包的,所以最好的方式是重写,使 g++ 能够单独对其进行矢量化,如我不知道如何在这种情况下使 SSE 代码可移植(包 RcppEigen 使库 Eigen 可用,所以这是可能的)。

非常感谢您的想法。

P.S.嵌套的代码看起来像

int k = 0;
for(size_t j = 0; j < J; j++) {
  char x = data[j];
  for(int s = 0; s < 4; s++) {
    A[k++] += B[x&3];
    x >>= 2;
  }
}

有一个使用 AVX2 的解决方案:

__m256 _B = _mm256_setr_ps(B[0], B[1], B[2], B[3], B[0], B[1], B[2], B[3]);
__m256i _shift = _mm256_setr_epi32(0, 2, 4, 6, 8, 10, 12, 14);
__m256i _mask = _mm256_set1_epi32(3);
for (size_t j = 0; j < J/2; j++)
{
    short x = ((short*)data)[j];
    __m256i _index = _mm256_and_si256(_mm256_srlv_epi32(_mm256_set1_epi32(x), _shift), _mask);
    _mm256_storeu_ps(A, _mm256_add_ps(_mm256_loadu_ps(A), _mm256_permutevar8x32_ps(_B, _index)));
    A += 8;
}

如果不求助于 SIMD 内在函数,我认为您无能为力,但您至少可以鼓励编译器对每次迭代的 4 次加法进行矢量化:

int k = 0;
for(size_t j = 0; j < J; j++) {
  char x = data[j];
  float C[4] __attribute__ ((aligned(16)));
  for(int s = 0; s < 4; s++) {
    C[s] = B[x&3];
    x >>= 2;
  }
  for(int s = 0; s < 4; s++) {
    A[k + s] += C[s];           // this loop should be vectorized
  }
  k += 4;
}

根据您的外循环有多大,可能值得为您将要添加到您的内循环中的四个常量集预先计算一个 LUT:

const float C[256][4] = {
    { B[0], B[0], B[0], B[0] },
    { B[1], B[0], B[0], B[0] },
    { B[2], B[0], B[0], B[0] },
    { B[3], B[0], B[0], B[0] },
    { B[0], B[1], B[0], B[0] },
    { B[1], B[1], B[0], B[0] },
    { B[2], B[1], B[0], B[0] },
    { B[3], B[1], B[0], B[0] },
    ...
    { B[3], B[3], B[3], B[3] }
} __attribute__ ((aligned(16)));

(您可能希望以编程方式生成此 LUT,而不是像上面那样对其进行内联编码)。

那么实际处理可以简化为:

int k = 0;
for(size_t j = 0; j < J; j++) {
  const uint8_t x = data[j];
  for(int s = 0; s < 4; s++) {
    A[k + s] += C[x][s];
  }
  k += 4;
}

如果您的编译器今天过得愉快,它应该完全矢量化。

显然,只有当 J 足够大以至于设置 C LUT 的成本可以分摊到大量后续迭代中时,这才值得。

我找到了另一种使用 SSSE3 指令进行优化的方法:

__m128i _B = _mm_castps_si128(_mm_loadu_ps(B));
__m128i _mul = _mm_setr_epi16(64, 64, 16, 16, 4, 4, 1, 1);
__m128i _and = _mm_set1_epi8(12);
__m128i _or = _mm_setr_epi8(0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3);
for (size_t j = 0; j < J; j++)
{
    const char x = data[j];
    __m128i x1 = _mm_set1_epi8(x);
    __m128i x2 = _mm_mullo_epi16(x1, _mul);
    __m128i x3 = _mm_srli_epi16(x2, 4);
    __m128i x4 = _mm_and_si128(x3, _and);
    __m128i x5 = _mm_or_si128(x4, _or);
    _mm_storeu_ps(A, _mm_add_ps(_mm_loadu_ps(A), _mm_castsi128_ps(_mm_shuffle_epi8(_B, x5))));
    A += 4;
}

但我认为以前的变体更好。

这是我已经提到的 c++14 尝试更快地破解它的尝试,它产生的结果小于零甚至没有;事实上 vectorized_op 在工作中比 unvectorized_op 大约慢 20%,在模拟版本中慢 200%。

g++-4.9.2/clang++-3.5.0 在 x86:64 Debian Jessie 上 与

 -std=c++14 -O3 -Wall -pedantic -msse4.1 -ftree-vectorize -ffast-math

(resp += -stdlib=libc++ for clang)

(更新)

#include <iterator>
#include <algorithm>
#include <valarray>
#include <array>
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <ctime>
#include <fstream>

using std::size_t;
using std::endl;
auto& Log = std::cout;

using real_type = float;
using ra_type = std::valarray<real_type>;
using ia_type = std::valarray<size_t>;
using workload_type = unsigned char;
using ca_type = std::valarray<workload_type>;

static constexpr bool BUILD_PIVOTS_H = false;
static constexpr size_t B_SIZE = 4;
static constexpr size_t MAX_CHAR = 256;
static constexpr size_t PRECOMPUTED_SIZE = MAX_CHAR * B_SIZE;
static std::array<workload_type, PRECOMPUTED_SIZE> PRECOMPUTED __attribute__ ((aligned(16)));
static std::array<real_type, PRECOMPUTED_SIZE> LUT __attribute__ ((aligned(16)));

struct sentry {
    sentry(const std::string& s) {
        Log << s << endl;
    }    
};


void unvectorized_op(ra_type& A, ra_type&B, const ca_type& data) {
    static sentry _("unvectorized_op:");
    int k = 0;
    for(size_t j = 0; j < data.size(); ++j) {
        char x = data[j];
#ifdef MOCK        
        for (auto& i : {0,1,2,3})
            A[k++] = real_type(0);
#else        
        for(int s = 0; s < 4; ++s) {
            A[k++] += B[x&3];
            x >>= 2;
        }
#endif        
    }
}    


void vectorized_op(ra_type& A, ra_type& B, const ca_type& data) {
    static sentry _("vectorized_op:");
    ra_type bflat(A.size());

    auto beg = std::begin(bflat);
    std::for_each(std::begin(data), std::end(data), 
        [&beg] (const workload_type& x) {
            const auto& it = &LUT[/*unsigned workload_type)*/(x)*4];
#ifdef MOCK            
            for (auto& i : {0,1,2,3})
                *(beg++) = real_type(0);
#else                       
            std::for_each(it, it + 4, 
                [&beg] (const real_type& c) {*beg = c; ++beg;}
            );
#endif                                   
        }
    );

    A = bflat;
}


int main (int argc, char* argv[]) {
    int magnitude  = -1;

    if (argc > 1) {
        try {
            magnitude = std::stoi(argv[1]);
        } catch (const std::runtime_error& e ){
            /*ignore*/
        }       
    }

    if ((magnitude < 1) || (magnitude > 10)) {
       Log << "no magnitude between 2 and 8 given; choosing 5\n";  
       magnitude = 5;
    }

    /* precomputing the pivoting */ {
        size_t i = 0;
        workload_type x = 0; 

        std::generate(std::begin(PRECOMPUTED), std::end(PRECOMPUTED),
            [&i, &x] () {
                workload_type rv = x & 3;
                x >>= 2;                  
                ++i;
                (i % B_SIZE) == 0 ? x = i/B_SIZE : x >>= 2;
                return rv;
            }
        );
    }


    ra_type B(B_SIZE);

    ::srand48(::clock());
    std::generate(std::begin(B), std::end(B), ::drand48);
    std::transform(std::begin(PRECOMPUTED), std::end(PRECOMPUTED), 
                   std::begin(LUT),
                   [&B] (const workload_type& c) {
                       return B[c];
                   });


    for (auto &f : {unvectorized_op, vectorized_op}) {
        for (int i = 1; i < magnitude; ++i) {
            size_t workload_size = pow(10, i);
            size_t repetitions = pow(10, magnitude - i);

            ca_type data(workload_size);
            std::generate(std::begin(data), std::end(data), 
                    [] () {return workload_type(::drand48() * MAX_CHAR);});

            ra_type A(0., workload_size * B_SIZE);

            auto start = ::clock();
            for (unsigned long j = 0; j < repetitions; ++j) {
                A = 0.;
                f(A, B, data);
            }
            auto duration = ::clock() - start;

            Log << std::setw(8) << repetitions   << " repetitions with a "
                << std::setw(8) << workload_size << " workload took " 
                << double(duration)/CLOCKS_PER_SEC << "s" 
                << endl; 
        } // for i in magnitudes
    } // for auto f

    if (BUILD_PIVOTS_H) {
        try { 
            std::ofstream pivots_h("pivots.h");
            pivots_h << "#ifndef PIVOTS_H_ \n\n";
            pivots_h << "const char PIVOTS[256][4] = {\n";
            for (auto i = 0; i < 256; ++i) {
                pivots_h << "    {";
                for (auto j = 0; j < 4; ++j) {
                    pivots_h << long(PRECOMPUTED[i*4+j]) << (j < 3 ? ", " : "");
                }
                pivots_h << ("}") << (i < 255 ? ",\n" : "\n");
            }
            pivots_h << "} __attribute__ ((aligned(16)));\n";
            pivots_h << "#endif\n" << std::endl;
        } catch (const std::runtime_error& e) {
            Log << "ERROR: " << e.what() << std::endl;
        }
    }
}

这是 Fortran 95 版本:

module setup
    implicit none
    logical GENERATE_PIVOTS
    parameter(GENERATE_PIVOTS = .TRUE.)
    integer, parameter                        :: CHAR_TYPE = 1
    integer, parameter                        :: REAL_TYPE = 4
    integer(CHAR_TYPE), dimension(0:255, 0:3) :: PIVOTS 
    real   (REAL_TYPE), dimension(0:255, 0:3) :: LUT 
    real(REAL_TYPE), dimension(0:3)           :: B    

  contains

    subroutine init(B)
        implicit none
        real(REAL_TYPE), dimension(0:3), intent(in) :: B
        integer                                     :: r, c

        if (GENERATE_PIVOTS) then
            do r = lbound(PIVOTS,1),ubound(PIVOTS,1)
                do c = lbound(PIVOTS,2),ubound(PIVOTS,2)
                    PIVOTS(r,c) = int(iand(ishft(r, -c), 3), kind=1)
                end do
            end do
        else 
            !> @TODO load PIVOTS FROM file
        end if

        do r = 0,255
            do c = 0,3
                LUT = B(PIVOTS(r,c))
            end do
        end do
    end subroutine init

end module setup

module testling
    use setup 
    implicit none

  contains

    subroutine generate(A, data)
        real(REAL_TYPE), dimension(:,:),  intent(inout):: A
        integer(CHAR_TYPE), dimension(:), intent(in)   :: data
        integer                                        :: i
        !A  = LUT(data, :)
        forall (i=lbound(A,1):ubound(A,1))
            A(i,:) = LUT(data(i), :) 
        end forall
    end subroutine generate

end module testling

subroutine do_test(workload_size, repetitions)
    use setup
    use testling
    implicit none
    integer, intent(in)                               :: workload_size
    integer, intent(in)                               :: repetitions  
    real(REAL_TYPE), dimension(0:workload_size-1,0:3) :: A    
    integer(CHAR_TYPE), dimension(0:workload_size-1)  :: data
    integer(8)                                        :: s, e, cr, cm
    integer(4)                                        :: i 
    real                                              :: random  

    make_workload : do i = lbound(data,1),ubound(data,1)
        call random_number(random)
        data(i) = int(floor(random * 256),kind=CHAR_TYPE)
    end do make_workload

    call system_clock(s, cr, cm)
        do i = 1,repetitions
            call generate(A, data)
        end do
    call system_clock(e, cr, cm)

    write(*,'(I8, " repetitions for a ",I8," workload took ",F12.10,"s")')&
        REPETITIONS, WORKLOAD_SIZE, real(e - s,kind = 8)/cr
end subroutine do_test


program main
    use setup
    implicit none
    integer(8)          :: c, cr, cm
    integer(4)          :: i, seed   
    character(len=100)  :: buffer
    integer(1)          :: magnitude = -1
    integer             :: workload_size
    integer             :: repetitions  

    if (iargc() > 0) then 
        call getarg(1, buffer)
        read(buffer,*) magnitude
    end if

    if ((magnitude < 2) .or. (magnitude > 10))  then 
        write(*,*) "no magnitude between 2 and 8 given; choosing 5"  
        magnitude = 5
    endif     

    call system_clock(c,cr,cm)
    seed = int(c/cr, kind=4)
    call random_seed(seed)

    fill_B: do i = lbound(B,1),ubound(B,1)
        call random_number(B(i))
    end do fill_B
    call init(B)

    do i=1,magnitude-1    
        workload_size = 10**i
        repetitions   = 10**(magnitude - i)
        call do_test(workload_size, repetitions)
    end do

end program main

gfortran-4.9 也用

调用
-O3 -Wall -pedantic -msse4.1 -ftree-vectorize -ffast-math

总结

运行 随机工作负载在 10 到 10M 之间。

未向量化的 C++ 版本和 Fortran 版本的规范化运行时间对于 < 1M 的工作负载略差于其标称 O(n) 参考工作负载大小,在 1M 以上大约翻倍,但仍然略差于标称 O(n) 参考 10M 工作负载的新基数。

Fortran 解决方案比未向量化的 C++ 版本快大约 40-50%。

带有 "vectorized" 标签的 C++ 函数充其量只是一场轻微的灾难;至少比非矢量化版本慢 20%。