在大型 C++ 数组中高效地计算均值

Efficiently calculate mean in a large C++ array

我有一个包含 7 个块的 65K 个样本的数据集:float arr[7][65536]我需要计算每个对应的 7 个数字的平均值,因此结果将是一个大小为 65536 的数组:

float result[0] = arr[0][0] + arr[1][0] + arr[2][0] + ... / 7
float result[1] = arr[0][1] + arr[1][1] + arr[2][1] + ... / 7

问题是不按顺序访问内存会造成很多缓存未命中,在内存方面有没有更好的方法来解决这个问题? 提前重塑数组同样会导致内存效率低下。

谢谢。

一个简单的解决方案是遍历相邻维度。但是,如果 sizeof(arr[0][0]) 非常大,这可能不是最佳选择,因为 result 不适合 L1 缓存。最好的解决方案可能是使用阻塞来解决这个问题。

这是执行此操作的 C++ 示例代码:

// Blocked reduction using blocks of size 1024
// This loop iterate over the blocks
for(size_t j=0 ; j<65536 ; j+=1024)
{
    for(size_t k=j ; k<j+1024 ; ++k)
        result[k] = arr[0][k];

    // Summation of the current block
    for(size_t i=1 ; i<7 ; ++i)
        for(size_t k=j ; k<j+1024 ; ++k)
            result[k] += arr[i][k];

    for(size_t k=j ; k<j+1024 ; ++k)
        result[k] /= 7;
}

请注意,result 的类型必须足够大以容纳 smaller/bigger 的值 arr[0][0] 的 7 倍。此循环应由您的编译器矢量化,并应产生更少的缓存未命中,从而产生更快的代码。

PS:如果循环未矢量化,您可以通过在基于 k 的内部循环之前添加 OpenMP 指令 #pragma omp simd 来帮助您的编译器(并确保 OpenMP已启用)。

希望保持围绕内部数组的内部循环将充分利用高速缓存行(因此比围绕外部数组进行内部循环更快),您可以试试这个:

for(const auto& inner_arr : arr) {
    for(size_t i = 0; i < std::size(inner_arr); ++i) {
        result[i] += inner_arr[i];
    }
}
for(auto& v : result) v /= std::size(arr); // div down to mean value

但是你必须衡量它是否真的比你现在拥有的更快/更有效。

一个不同的方法可能正在使用 C++17 中添加的执行策略。我添加了一个 counting iterator 来让它工作。如果你有boost,你可以使用他们的计数迭代器

struct counting_iterator {
    using iterator_category = std::forward_iterator_tag;
    using value_type = size_t;
    using pointer = value_type*;
    using referece = value_type&;
    using difference_type = std::intmax_t;

    counting_iterator& operator++() { ++value; return *this; }
    counting_iterator operator++(int) { auto copy=*this; ++value; return copy; }
    bool operator==(const counting_iterator& rhs) const { return value == rhs.value; }
    bool operator!=(const counting_iterator& rhs) const { return value != rhs.value; }

    value_type operator*() const { return value; }

    size_t value;
}

// ...

std::for_each(std::execution::par_unseq, counting_iterator{0}, counting_iterator{65536},
    [&arr, &result](size_t i) {
        for(size_t o = 0; o < 7; ++o)
            result[i] += arr[o][i];
        result[i] /= 7;
    });

当使用 gcc 和 int 作为数据类型时,根据 quick-bench and ~2.1 times faster with clang,这比我上面的简单解决方案快 ~1.4 倍。这可能会有很大差异,具体取决于您可用的硬件线程数。